diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java index f4488774e9..82299806b0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.Schema; @@ -31,11 +32,14 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.DataTypes; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; @@ -44,6 +48,16 @@ /** IT case for lake enable table. */ class LakeEnableTableITCase extends ClientToServerITCaseBase { + @BeforeEach + void beforeEach() throws Exception { + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig(DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET))) + .get(); + } + @Test void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throws Exception { String databaseName = "test_db"; @@ -86,7 +100,9 @@ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throw // Enable datalake format for the cluster admin.alterClusterConfigs( - Collections.singletonList( + Arrays.asList( + new AlterConfig( + DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET), new AlterConfig( DATALAKE_FORMAT.key(), DataLakeFormat.PAIMON.toString(), @@ -177,4 +193,142 @@ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception { TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); } + + @Test + void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws Exception { + String databaseName = "test_db"; + String tableName = "test_table_format_mismatch"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.ICEBERG) + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET))) + .get(); + + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("'table.datalake.format' ('iceberg')") + .hasMessageContaining("cluster 'datalake.format' ('paimon')"); + } + + @Test + void testEnableTableAfterClusterEnablesDataLake() throws Exception { + String databaseName = "test_db"; + String tableName = "test_enable_datalake_table"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON); + assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse(); + + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("doesn't enable datalake tables"); + + admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET))) + .get(); + admin.alterTable(tablePath, enableDatalakeChange, false).get(); + + TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); + assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); + assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat()) + .hasValue(DataLakeFormat.PAIMON); + } + + @Test + void testLegacyClusterCanStillEnableTableLevelDatalake() throws Exception { + String databaseName = "test_db_legacy_enable"; + String tableName = "test_table_legacy_enable"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Collections.singletonList( + // not set DATALAKE_ENABLED to mock legacy cluster + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON); + assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse(); + + // make sure we can still enable datalake for the table + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + admin.alterTable(tablePath, enableDatalakeChange, false).get(); + + TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); + assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); + assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat()) + .hasValue(DataLakeFormat.PAIMON); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index bcd00b1300..257bf27908 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1993,12 +1993,24 @@ public class ConfigOptions { // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ + public static final ConfigOption DATALAKE_ENABLED = + key("datalake.enabled") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether the Fluss cluster is ready to create and manage lakehouse tables. " + + "If unset, Fluss keeps the legacy behavior where configuring `datalake.format` " + + "also enables lakehouse tables. If set to `false`, Fluss pre-binds the lake format " + + "for newly created tables but does not allow lakehouse tables yet. If set to `true`, " + + "Fluss fully enables lakehouse tables. When this option is explicitly configured, " + + "`datalake.format` must also be configured."); + public static final ConfigOption DATALAKE_FORMAT = key("datalake.format") .enumType(DataLakeFormat.class) .noDefaultValue() .withDescription( - "The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. " + "The datalake format used by Fluss as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. " + "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi."); // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 83016cb52c..c237963038 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -447,8 +447,7 @@ public CompletableFuture createTable(CreateTableRequest req validateTableCreationPermission(tableDescriptor, tablePath); // apply system defaults if the config is not set - tableDescriptor = - applySystemDefaults(tableDescriptor, lakeCatalogContainer.getDataLakeFormat()); + tableDescriptor = applySystemDefaults(tableDescriptor, lakeCatalogContainer); // the distribution and bucket count must be set now //noinspection OptionalGetWithoutIsPresent @@ -560,7 +559,10 @@ public static TablePropertyChanges toTablePropertyChanges(List tabl } private TableDescriptor applySystemDefaults( - TableDescriptor tableDescriptor, DataLakeFormat dataLakeFormat) { + TableDescriptor tableDescriptor, + LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer) { + DataLakeFormat dataLakeFormat = lakeCatalogContainer.getDataLakeFormat(); + boolean clusterDataLakeTableEnabled = lakeCatalogContainer.isClusterDataLakeTableEnabled(); TableDescriptor newDescriptor = tableDescriptor; // not set bucket num @@ -594,7 +596,7 @@ private TableDescriptor applySystemDefaults( // lake table can only be enabled when the cluster configures datalake format boolean dataLakeEnabled = isDataLakeEnabled(tableDescriptor); - if (dataLakeEnabled && dataLakeFormat == null) { + if (dataLakeEnabled && !clusterDataLakeTableEnabled) { throw new InvalidTableException( String.format( "'%s' is enabled for the table, but the Fluss cluster doesn't enable datalake tables.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java index db295922b9..93d1600087 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java @@ -33,14 +33,17 @@ import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** - * A dynamic loader for lake catalog. Each time when the datalake format is changed, the lake - * catalog will be changed. + * A dynamic loader for lake catalog. Each time when the effective datalake runtime mode changes, + * the lake catalog will be changed. */ public class LakeCatalogDynamicLoader implements ServerReconfigurable, AutoCloseable { private volatile LakeCatalogContainer lakeCatalogContainer; @@ -63,17 +66,37 @@ public void validate(Configuration newConfig) throws ConfigException { newConfig.getOptional(DATALAKE_FORMAT).isPresent() ? newConfig.get(DATALAKE_FORMAT) : currentConfiguration.get(DATALAKE_FORMAT); + + // TODO: validate(...) only sees the merged effective cluster config, so it cannot + // detect the case where a user enables datalake.enabled and unsets + // datalake.format in the same dynamic config change. This may leave the cluster + // with datalake.enabled set but no datalake.format. Fixing this likely requires + // extending the validate/reconfigure framework to expose the incremental change + // set, rather than only the merged result. We accept this for now because + // table-level enablement is still validated, and enabling datalake for a table + // will fail if datalake.format is not configured. + Optional optDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED); + if (optDataLakeEnabled.isPresent() + && optDataLakeEnabled.get() + && newDatalakeFormat == null) { + throw new ConfigException( + String.format( + "'%s' must be configured when '%s' is explicitly set to true.", + DATALAKE_FORMAT.key(), DATALAKE_ENABLED.key())); + } + // If datalake format is not set, skip prefix validation so that users can disable or enable // datalake format without re-supplying all datalake-prefixed properties. if (newDatalakeFormat == null) { return; } - Map configMap = newConfig.toMap(); String datalakePrefix = "datalake." + newDatalakeFormat + "."; + Map configMap = newConfig.toMap(); configMap.forEach( (key, value) -> { if (!key.equals(DATALAKE_FORMAT.key()) + && !key.equals(DATALAKE_ENABLED.key()) && key.startsWith("datalake.") && !key.startsWith(datalakePrefix)) { throw new ConfigException( @@ -87,15 +110,29 @@ public void validate(Configuration newConfig) throws ConfigException { @Override public void reconfigure(Configuration newConfig) throws ConfigException { LakeCatalogContainer lastLakeCatalogContainer = lakeCatalogContainer; - DataLakeFormat newLakeFormat = newConfig.getOptional(DATALAKE_FORMAT).orElse(null); - if (newLakeFormat != lastLakeCatalogContainer.dataLakeFormat) { - IOUtils.closeQuietly( - lastLakeCatalogContainer.lakeCatalog, - "Close lake catalog because config changes"); - this.lakeCatalogContainer = - new LakeCatalogContainer(newConfig, pluginManager, isCoordinator); + if (!hasLakeRelevantConfigChanged(currentConfiguration, newConfig)) { this.currentConfiguration = newConfig; + return; } + + LakeCatalogContainer newLakeCatalogContainer = + new LakeCatalogContainer(newConfig, pluginManager, isCoordinator); + IOUtils.closeQuietly( + lastLakeCatalogContainer.lakeCatalog, "Close lake catalog because config changes"); + this.lakeCatalogContainer = newLakeCatalogContainer; + this.currentConfiguration = newConfig; + } + + private static boolean hasLakeRelevantConfigChanged( + Configuration currentConfiguration, Configuration newConfig) { + return !extractLakeRelevantConfig(currentConfiguration) + .equals(extractLakeRelevantConfig(newConfig)); + } + + private static Map extractLakeRelevantConfig(Configuration configuration) { + return configuration.toMap().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("datalake.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } public LakeCatalogContainer getLakeCatalogContainer() { @@ -112,6 +149,10 @@ public void close() throws Exception { @Nullable private static LakeCatalog createLakeCatalog(Configuration conf, PluginManager pluginManager) { + if (!LakeCatalogContainer.isClusterDataLakeTableEnabled(conf)) { + return null; + } + DataLakeFormat dataLakeFormat = conf.get(ConfigOptions.DATALAKE_FORMAT); if (dataLakeFormat == null) { return null; @@ -127,7 +168,7 @@ private static LakeCatalog createLakeCatalog(Configuration conf, PluginManager p /** A container for lake catalog. */ public static class LakeCatalogContainer { - // null if the cluster hasn't configured datalake format + private final boolean clusterDataLakeTableEnabled; private final @Nullable DataLakeFormat dataLakeFormat; private final @Nullable LakeCatalog lakeCatalog; private final @Nullable Map defaultTableLakeOptions; @@ -136,19 +177,31 @@ public LakeCatalogContainer( Configuration configuration, @Nullable PluginManager pluginManager, boolean isCoordinator) { + this.clusterDataLakeTableEnabled = isClusterDataLakeTableEnabled(configuration); this.dataLakeFormat = configuration.getOptional(DATALAKE_FORMAT).orElse(null); this.lakeCatalog = isCoordinator ? createLakeCatalog(configuration, pluginManager) : null; this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(configuration); - if (isCoordinator && ((dataLakeFormat == null) != (lakeCatalog == null))) { + if (isCoordinator && clusterDataLakeTableEnabled == (lakeCatalog == null)) { throw new ConfigException( String.format( - "dataLakeFormat and lakeCatalog must both be null or both non-null, but dataLakeFormat is %s, lakeCatalog is %s.", - dataLakeFormat, lakeCatalog)); + "clusterDataLakeTableEnabled and lakeCatalog must both be false/null or true/non-null, but clusterDataLakeTableEnabled is %s, lakeCatalog is %s.", + clusterDataLakeTableEnabled, lakeCatalog)); } } + static boolean isClusterDataLakeTableEnabled(Configuration configuration) { + Optional explicitDataLakeEnabled = configuration.getOptional(DATALAKE_ENABLED); + // if datalake.enabled not set, use datalake.format for legacy cluster behavior + return explicitDataLakeEnabled.orElseGet( + () -> configuration.getOptional(DATALAKE_FORMAT).isPresent()); + } + + public boolean isClusterDataLakeTableEnabled() { + return clusterDataLakeTableEnabled; + } + @Nullable public DataLakeFormat getDataLakeFormat() { return dataLakeFormat; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index cc2bb5b702..aa9aa44c83 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -367,7 +367,10 @@ public long createTable( boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException { // validate table properties before creating table - validateTableDescriptor(tableToCreate, maxBucketNum); + validateTableDescriptor( + tableToCreate, + maxBucketNum, + lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()); if (!databaseExists(tablePath.getDatabaseName())) { throw new DatabaseNotExistException( @@ -514,8 +517,10 @@ public void alterTableProperties( if (newDescriptor != null) { // reuse the same validate logic with the createTable() method - validateTableDescriptor(newDescriptor, maxBucketNum); - + validateTableDescriptor( + newDescriptor, + maxBucketNum, + lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()); // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 14e4330d7c..05390a5d3d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -28,6 +28,7 @@ import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.ChangelogImage; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; @@ -41,6 +42,8 @@ import org.apache.fluss.utils.AutoPartitionStrategy; import org.apache.fluss.utils.StringUtils; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -79,7 +82,10 @@ public class TableDescriptorValidation { Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW); /** Validate table descriptor to create is valid and contain all necessary information. */ - public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) { + public static void validateTableDescriptor( + TableDescriptor tableDescriptor, + int maxBucketNum, + @Nullable DataLakeFormat clusterDataLakeFormat) { Schema schema = tableDescriptor.getSchema(); boolean hasPrimaryKey = schema.getPrimaryKey().isPresent(); Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties()); @@ -118,6 +124,31 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int checkTieredLog(tableConf); checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType()); checkSystemColumns(schema.getRowType()); + checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); + } + + private static void checkTableLakeFormatMatchesCluster( + Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { + if (clusterDataLakeFormat == null) { + return; + } + + if (!tableConf.get(ConfigOptions.TABLE_DATALAKE_ENABLED)) { + return; + } + + Optional tableDataLakeFormat = + tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT); + if (tableDataLakeFormat.isPresent() && tableDataLakeFormat.get() != clusterDataLakeFormat) { + throw new InvalidConfigException( + String.format( + "'%s' ('%s') must match cluster '%s' ('%s') when '%s' is enabled.", + ConfigOptions.TABLE_DATALAKE_FORMAT.key(), + tableDataLakeFormat.get(), + ConfigOptions.DATALAKE_FORMAT.key(), + clusterDataLakeFormat, + ConfigOptions.TABLE_DATALAKE_ENABLED.key())); + } } public static void validateAlterTableProperties( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index b20f39bc59..3ef221ecab 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; @@ -528,4 +529,27 @@ public void reconfigure(Configuration newConfig) { // Verify the reconfigurable was notified with the new value assertThat(reconfiguredValue.get()).isEqualTo(2); } + + @Test + void testExplicitDataLakeEnabledRequiresDataLakeFormat() throws Exception { + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(new Configuration(), null, true)) { + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, new Configuration(), true); + dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.startup(); + + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_ENABLED.key(), + "true", + AlterConfigOpType.SET)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining( + "'datalake.format' must be configured when 'datalake.enabled' is explicitly set to true."); + } + } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index f73b4e543d..c8688daba1 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -188,9 +188,10 @@ More metrics example could be found in [Observability - Metric Reporters](observ | metrics.reporter.prometheus-push.grouping-key | String | (None) | Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. | ## Lakehouse -| Option | Type | Default | Description | -|-----------------|------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| datalake.format | Enum | (None) | The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. | +| Option | Type | Default | Description | +|------------------|---------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| datalake.enabled | Boolean | (None) | Whether the Fluss cluster is ready to create and manage lakehouse tables. If unset, Fluss keeps the legacy behavior where configuring `datalake.format` also enables lakehouse tables. If set to `false`, Fluss pre-binds the lake format for newly created tables but does not allow lakehouse tables yet. If set to `true`, Fluss fully enables lakehouse tables. When this option is explicitly configured to true, `datalake.format` must also be configured. | +| datalake.format | Enum | (None) | The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. | ## Kafka diff --git a/website/docs/maintenance/observability/quickstart.md b/website/docs/maintenance/observability/quickstart.md index 8cee873c89..2402c81629 100644 --- a/website/docs/maintenance/observability/quickstart.md +++ b/website/docs/maintenance/observability/quickstart.md @@ -94,6 +94,7 @@ services: zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://coordinator-server:9123 remote.data.dir: /tmp/fluss/remote-data + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon @@ -118,6 +119,7 @@ services: data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data kv.snapshot.interval: 0s + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon diff --git a/website/docs/maintenance/operations/updating-configs.md b/website/docs/maintenance/operations/updating-configs.md index 22cc77fa95..b44267a1d1 100644 --- a/website/docs/maintenance/operations/updating-configs.md +++ b/website/docs/maintenance/operations/updating-configs.md @@ -13,7 +13,8 @@ Fluss allows you to update cluster or table configurations dynamically without r From Fluss version 0.8 onwards, some of the server configs can be updated without restarting the server. Currently, the supported dynamically updatable server configurations include: -- `datalake.format`: Enable lakehouse storage by specifying the lakehouse format, e.g., `paimon`, `iceberg`. +- `datalake.enabled`: Control whether the cluster is ready to create and manage lakehouse tables. When this option is explicitly configured to true, `datalake.format` must also be configured. +- `datalake.format`: Specify the lakehouse format, e.g., `paimon`, `iceberg`. When enabling lakehouse storage explicitly, use it together with `datalake.enabled = true`. - Options with prefix `datalake.${datalake.format}` - `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and compaction write rate shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit the rate, or a very high value to effectively disable rate limiting. @@ -27,13 +28,14 @@ Here is a code snippet to demonstrate how to update the cluster configurations u ```java // Enable lakehouse storage with Paimon format admin.alterClusterConfigs( - Collections.singletonList( + Arrays.asList( + new AlterConfig(DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET), new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); // Disable lakehouse storage admin.alterClusterConfigs( Collections.singletonList( - new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.DELETE))); + new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET))); // Set RocksDB shared rate limiter to 200MB/sec admin.alterClusterConfigs( @@ -57,4 +59,4 @@ The connector options on a table including [Storage Options](engine-flink/option ```sql -- Enable lakehouse storage for the given table ALTER TABLE my_table SET ('table.datalake.enabled' = 'true'); -``` \ No newline at end of file +``` diff --git a/website/docs/maintenance/operations/upgrade-notes-0.10.md b/website/docs/maintenance/operations/upgrade-notes-0.10.md index 44820735fd..4681bcd5ff 100644 --- a/website/docs/maintenance/operations/upgrade-notes-0.10.md +++ b/website/docs/maintenance/operations/upgrade-notes-0.10.md @@ -4,3 +4,41 @@ sidebar_position: 4 --- # Upgrade Notes from v0.9 to v0.10 + +## Cluster Configuration Changes + +### New `datalake.enabled` Cluster Configuration + +Starting in v0.10, Fluss introduces the cluster-level configuration `datalake.enabled` to control whether the cluster is ready to create and manage lakehouse tables. + +#### Behavior Changes + +- If `datalake.enabled` is unset, Fluss preserves the legacy behavior: configuring `datalake.format` alone also enables lakehouse tables. +- If `datalake.enabled = false`, Fluss pre-binds the lake format for newly created tables but does not allow lakehouse tables yet. +- If `datalake.enabled = true`, Fluss fully enables lakehouse tables. +- If `datalake.enabled` is explicitly configured, `datalake.format` must also be configured. + +#### Recommended Configuration + +To enable lakehouse tables for the cluster, configure both options together: + +```yaml +datalake.enabled: true +datalake.format: paimon +``` + +To pre-bind the lake format without enabling lakehouse tables yet, configure: + +```yaml +datalake.enabled: false +datalake.format: paimon +``` + +This mode is useful when you want newly created tables to carry the lake format in advance, while postponing lakehouse enablement at the cluster level. +After `datalake.enabled` is later set to `true`, tables created under this configuration can still turn on `table.datalake.enabled` without being recreated. + +#### Notes for Existing Deployments + +If your existing deployment or internal scripts only set `datalake.format`, they will continue to work with the legacy behavior as long as `datalake.enabled` remains unset. + +For new configuration examples and operational guidance, we recommend explicitly configuring `datalake.enabled` together with `datalake.format`. diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md b/website/docs/maintenance/tiered-storage/lakehouse-storage.md index 2b9e984f33..8f32eb64b8 100644 --- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md +++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md @@ -28,6 +28,7 @@ You can refer to the documentation of the corresponding data lake format integra First, you must configure the lakehouse storage in `server.yaml`. Take Paimon as an example, you must configure the following configurations: ```yaml # Paimon configuration +datalake.enabled: true datalake.format: paimon # the catalog config about Paimon, assuming using Filesystem catalog @@ -39,6 +40,7 @@ Fluss processes Paimon configurations by removing the `datalake.paimon.` prefix For example, if you want to configure to use Hive catalog, you can configure like following: ```yaml +datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: hive datalake.paimon.uri: thrift://: diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 6223c22d45..39240c61ae 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -111,6 +111,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -137,6 +138,7 @@ services: s3.secret-key: rustfsadmin s3.path.style.access: true kv.snapshot.interval: 0s + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -328,6 +330,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog @@ -362,6 +365,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog