From 541769fc06fc1344d5fde4355cf629530bdc5dc8 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 25 Mar 2026 09:50:42 +0800 Subject: [PATCH 1/4] [server] Allow enabling lakehouse for pre-bind tables --- .../client/table/LakeEnableTableITCase.java | 100 +++++++++++++++++- .../apache/fluss/config/ConfigOptions.java | 14 ++- .../coordinator/CoordinatorService.java | 10 +- .../coordinator/LakeCatalogDynamicLoader.java | 80 +++++++++++--- .../server/coordinator/MetadataManager.java | 3 - .../fluss/server/DynamicConfigChangeTest.java | 61 +++++++++++ 6 files changed, 244 insertions(+), 24 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java index f4488774e9..f593e4af40 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java @@ -33,9 +33,11 @@ import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; @@ -86,7 +88,9 @@ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throw // Enable datalake format for the cluster admin.alterClusterConfigs( - Collections.singletonList( + Arrays.asList( + new AlterConfig( + DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET), new AlterConfig( DATALAKE_FORMAT.key(), DataLakeFormat.PAIMON.toString(), @@ -177,4 +181,98 @@ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception { TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); } + + @Test + void testEnableTableAfterClusterEnablesDataLake() throws Exception { + String databaseName = "test_db"; + String tableName = "test_table_before_cluster_enable"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON); + assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse(); + + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("doesn't enable datalake tables"); + + admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET))) + .get(); + admin.alterTable(tablePath, enableDatalakeChange, false).get(); + + TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); + assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); + assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat()) + .hasValue(DataLakeFormat.PAIMON); + } + + @Test + void testLegacyClusterCanStillEnableTableLevelDatalake() throws Exception { + String databaseName = "test_db_legacy_enable"; + String tableName = "test_table_legacy_enable"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Collections.singletonList( + // not set DATALAKE_ENABLED to mock legacy cluster + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON); + assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse(); + + // make sure we can still enable datalake for the table + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + admin.alterTable(tablePath, enableDatalakeChange, false).get(); + + TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get(); + assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); + assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat()) + .hasValue(DataLakeFormat.PAIMON); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index bcd00b1300..257bf27908 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1993,12 +1993,24 @@ public class ConfigOptions { // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ + public static final ConfigOption DATALAKE_ENABLED = + key("datalake.enabled") + .booleanType() + .noDefaultValue() + .withDescription( + "Whether the Fluss cluster is ready to create and manage lakehouse tables. " + + "If unset, Fluss keeps the legacy behavior where configuring `datalake.format` " + + "also enables lakehouse tables. If set to `false`, Fluss pre-binds the lake format " + + "for newly created tables but does not allow lakehouse tables yet. If set to `true`, " + + "Fluss fully enables lakehouse tables. When this option is explicitly configured, " + + "`datalake.format` must also be configured."); + public static final ConfigOption DATALAKE_FORMAT = key("datalake.format") .enumType(DataLakeFormat.class) .noDefaultValue() .withDescription( - "The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. " + "The datalake format used by Fluss as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. " + "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi."); // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 83016cb52c..c237963038 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -447,8 +447,7 @@ public CompletableFuture createTable(CreateTableRequest req validateTableCreationPermission(tableDescriptor, tablePath); // apply system defaults if the config is not set - tableDescriptor = - applySystemDefaults(tableDescriptor, lakeCatalogContainer.getDataLakeFormat()); + tableDescriptor = applySystemDefaults(tableDescriptor, lakeCatalogContainer); // the distribution and bucket count must be set now //noinspection OptionalGetWithoutIsPresent @@ -560,7 +559,10 @@ public static TablePropertyChanges toTablePropertyChanges(List tabl } private TableDescriptor applySystemDefaults( - TableDescriptor tableDescriptor, DataLakeFormat dataLakeFormat) { + TableDescriptor tableDescriptor, + LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer) { + DataLakeFormat dataLakeFormat = lakeCatalogContainer.getDataLakeFormat(); + boolean clusterDataLakeTableEnabled = lakeCatalogContainer.isClusterDataLakeTableEnabled(); TableDescriptor newDescriptor = tableDescriptor; // not set bucket num @@ -594,7 +596,7 @@ private TableDescriptor applySystemDefaults( // lake table can only be enabled when the cluster configures datalake format boolean dataLakeEnabled = isDataLakeEnabled(tableDescriptor); - if (dataLakeEnabled && dataLakeFormat == null) { + if (dataLakeEnabled && !clusterDataLakeTableEnabled) { throw new InvalidTableException( String.format( "'%s' is enabled for the table, but the Fluss cluster doesn't enable datalake tables.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java index db295922b9..1214498f9e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java @@ -33,14 +33,17 @@ import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** - * A dynamic loader for lake catalog. Each time when the datalake format is changed, the lake - * catalog will be changed. + * A dynamic loader for lake catalog. Each time when the effective datalake runtime mode changes, + * the lake catalog will be changed. */ public class LakeCatalogDynamicLoader implements ServerReconfigurable, AutoCloseable { private volatile LakeCatalogContainer lakeCatalogContainer; @@ -63,23 +66,40 @@ public void validate(Configuration newConfig) throws ConfigException { newConfig.getOptional(DATALAKE_FORMAT).isPresent() ? newConfig.get(DATALAKE_FORMAT) : currentConfiguration.get(DATALAKE_FORMAT); + + boolean explicitDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED).orElse(false); + + Optional newDataLakeFormat = newConfig.getOptional(DATALAKE_FORMAT); + Optional effectiveDataLakeFormat = + newDataLakeFormat.isPresent() + ? newDataLakeFormat + : currentConfiguration.getOptional(DATALAKE_FORMAT); + + if (explicitDataLakeEnabled && newDatalakeFormat != null) { + throw new ConfigException( + String.format( + "'%s' must be configured when '%s' is explicitly set.", + DATALAKE_FORMAT.key(), DATALAKE_ENABLED.key())); + } + // If datalake format is not set, skip prefix validation so that users can disable or enable // datalake format without re-supplying all datalake-prefixed properties. if (newDatalakeFormat == null) { return; } + String datalakePrefix = "datalake." + effectiveDataLakeFormat.get() + "."; Map configMap = newConfig.toMap(); - String datalakePrefix = "datalake." + newDatalakeFormat + "."; configMap.forEach( (key, value) -> { if (!key.equals(DATALAKE_FORMAT.key()) + && !key.equals(DATALAKE_ENABLED.key()) && key.startsWith("datalake.") && !key.startsWith(datalakePrefix)) { throw new ConfigException( String.format( "Invalid configuration '%s' for '%s' datalake format", - key, newDatalakeFormat)); + key, effectiveDataLakeFormat.get())); } }); } @@ -87,15 +107,29 @@ public void validate(Configuration newConfig) throws ConfigException { @Override public void reconfigure(Configuration newConfig) throws ConfigException { LakeCatalogContainer lastLakeCatalogContainer = lakeCatalogContainer; - DataLakeFormat newLakeFormat = newConfig.getOptional(DATALAKE_FORMAT).orElse(null); - if (newLakeFormat != lastLakeCatalogContainer.dataLakeFormat) { - IOUtils.closeQuietly( - lastLakeCatalogContainer.lakeCatalog, - "Close lake catalog because config changes"); - this.lakeCatalogContainer = - new LakeCatalogContainer(newConfig, pluginManager, isCoordinator); + if (!hasLakeRelevantConfigChanged(currentConfiguration, newConfig)) { this.currentConfiguration = newConfig; + return; } + + LakeCatalogContainer newLakeCatalogContainer = + new LakeCatalogContainer(newConfig, pluginManager, isCoordinator); + IOUtils.closeQuietly( + lastLakeCatalogContainer.lakeCatalog, "Close lake catalog because config changes"); + this.lakeCatalogContainer = newLakeCatalogContainer; + this.currentConfiguration = newConfig; + } + + private static boolean hasLakeRelevantConfigChanged( + Configuration currentConfiguration, Configuration newConfig) { + return !extractLakeRelevantConfig(currentConfiguration) + .equals(extractLakeRelevantConfig(newConfig)); + } + + private static Map extractLakeRelevantConfig(Configuration configuration) { + return configuration.toMap().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("datalake.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } public LakeCatalogContainer getLakeCatalogContainer() { @@ -112,6 +146,10 @@ public void close() throws Exception { @Nullable private static LakeCatalog createLakeCatalog(Configuration conf, PluginManager pluginManager) { + if (!LakeCatalogContainer.isClusterDataLakeTableEnabled(conf)) { + return null; + } + DataLakeFormat dataLakeFormat = conf.get(ConfigOptions.DATALAKE_FORMAT); if (dataLakeFormat == null) { return null; @@ -127,7 +165,7 @@ private static LakeCatalog createLakeCatalog(Configuration conf, PluginManager p /** A container for lake catalog. */ public static class LakeCatalogContainer { - // null if the cluster hasn't configured datalake format + private final boolean clusterDataLakeTableEnabled; private final @Nullable DataLakeFormat dataLakeFormat; private final @Nullable LakeCatalog lakeCatalog; private final @Nullable Map defaultTableLakeOptions; @@ -136,19 +174,31 @@ public LakeCatalogContainer( Configuration configuration, @Nullable PluginManager pluginManager, boolean isCoordinator) { + this.clusterDataLakeTableEnabled = isClusterDataLakeTableEnabled(configuration); this.dataLakeFormat = configuration.getOptional(DATALAKE_FORMAT).orElse(null); this.lakeCatalog = isCoordinator ? createLakeCatalog(configuration, pluginManager) : null; this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(configuration); - if (isCoordinator && ((dataLakeFormat == null) != (lakeCatalog == null))) { + if (isCoordinator && clusterDataLakeTableEnabled == (lakeCatalog == null)) { throw new ConfigException( String.format( - "dataLakeFormat and lakeCatalog must both be null or both non-null, but dataLakeFormat is %s, lakeCatalog is %s.", - dataLakeFormat, lakeCatalog)); + "clusterDataLakeTableEnabled and lakeCatalog must both be false/null or true/non-null, but clusterDataLakeTableEnabled is %s, lakeCatalog is %s.", + clusterDataLakeTableEnabled, lakeCatalog)); } } + static boolean isClusterDataLakeTableEnabled(Configuration configuration) { + Optional explicitDataLakeEnabled = configuration.getOptional(DATALAKE_ENABLED); + // if datalake.enabled no set, use datalake.format for legacy cluster behavior + return explicitDataLakeEnabled.orElseGet( + () -> configuration.getOptional(DATALAKE_FORMAT).isPresent()); + } + + public boolean isClusterDataLakeTableEnabled() { + return clusterDataLakeTableEnabled; + } + @Nullable public DataLakeFormat getDataLakeFormat() { return dataLakeFormat; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index cc2bb5b702..c9d5e4629e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -515,7 +515,6 @@ public void alterTableProperties( if (newDescriptor != null) { // reuse the same validate logic with the createTable() method validateTableDescriptor(newDescriptor, maxBucketNum); - // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( @@ -563,8 +562,6 @@ private void preAlterTableProperties( + tablePath + " in data lake, because the Fluss cluster doesn't enable datalake tables."); } - - // to enable lake table if (!isDataLakeEnabled(tableDescriptor)) { // before create table in fluss, we may create in lake try { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index b20f39bc59..685966102f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; @@ -528,4 +529,64 @@ public void reconfigure(Configuration newConfig) { // Verify the reconfigurable was notified with the new value assertThat(reconfiguredValue.get()).isEqualTo(2); } + + @Test + void testExplicitDatalakeEnabledRequiresFormat() throws Exception { + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(new Configuration(), null, true)) { + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, new Configuration(), true); + dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.startup(); + + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_ENABLED.key(), + "false", + AlterConfigOpType.SET)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining( + "'datalake.format' must be configured when 'datalake.enabled' is explicitly set."); + } + } + + @Test + void testPreBindOnlyModeDoesNotCreateLakeCatalog() throws Exception { + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(new Configuration(), null, true)) { + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, new Configuration(), true); + dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.startup(); + + dynamicConfigManager.alterConfigs( + Arrays.asList( + new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET), + new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET), + new AlterConfig( + "datalake.paimon.metastore", + "filesystem", + AlterConfigOpType.SET))); + + assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()) + .isEqualTo(PAIMON); + assertThat( + lakeCatalogDynamicLoader + .getLakeCatalogContainer() + .isClusterDataLakeTableEnabled()) + .isFalse(); + assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog()) + .isNull(); + assertThat( + lakeCatalogDynamicLoader + .getLakeCatalogContainer() + .getDefaultTableLakeOptions()) + .isEqualTo( + Collections.singletonMap( + "table.datalake.paimon.metastore", "filesystem")); + } + } } From 4f17267393659c750a29e7d80e8f661d08de937a Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 25 Mar 2026 12:14:52 +0800 Subject: [PATCH 2/4] add doc --- .../maintenance/observability/quickstart.md | 2 ++ .../operations/updating-configs.md | 20 ++++++++--- .../operations/upgrade-notes-0.10.md | 33 +++++++++++++++++++ .../tiered-storage/lakehouse-storage.md | 2 ++ website/docs/quickstart/lakehouse.md | 4 +++ 5 files changed, 56 insertions(+), 5 deletions(-) diff --git a/website/docs/maintenance/observability/quickstart.md b/website/docs/maintenance/observability/quickstart.md index 8cee873c89..2402c81629 100644 --- a/website/docs/maintenance/observability/quickstart.md +++ b/website/docs/maintenance/observability/quickstart.md @@ -94,6 +94,7 @@ services: zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://coordinator-server:9123 remote.data.dir: /tmp/fluss/remote-data + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon @@ -118,6 +119,7 @@ services: data.dir: /tmp/fluss/data remote.data.dir: /tmp/fluss/remote-data kv.snapshot.interval: 0s + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: /tmp/paimon diff --git a/website/docs/maintenance/operations/updating-configs.md b/website/docs/maintenance/operations/updating-configs.md index 22cc77fa95..2cde6e7784 100644 --- a/website/docs/maintenance/operations/updating-configs.md +++ b/website/docs/maintenance/operations/updating-configs.md @@ -13,7 +13,8 @@ Fluss allows you to update cluster or table configurations dynamically without r From Fluss version 0.8 onwards, some of the server configs can be updated without restarting the server. Currently, the supported dynamically updatable server configurations include: -- `datalake.format`: Enable lakehouse storage by specifying the lakehouse format, e.g., `paimon`, `iceberg`. +- `datalake.enabled`: Control whether the cluster is ready to create and manage lakehouse tables. When this option is explicitly configured, `datalake.format` must also be configured. +- `datalake.format`: Specify the lakehouse format, e.g., `paimon`, `iceberg`. When enabling lakehouse storage explicitly, use it together with `datalake.enabled = true`. - Options with prefix `datalake.${datalake.format}` - `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and compaction write rate shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit the rate, or a very high value to effectively disable rate limiting. @@ -27,13 +28,20 @@ Here is a code snippet to demonstrate how to update the cluster configurations u ```java // Enable lakehouse storage with Paimon format admin.alterClusterConfigs( - Collections.singletonList( + Arrays.asList( + new AlterConfig(DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET), + new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); + +// Pre-bind the lakehouse format without enabling lakehouse tables +admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET), new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); -// Disable lakehouse storage +// Return to legacy behavior where configuring datalake.format alone also enables lakehouse tables admin.alterClusterConfigs( Collections.singletonList( - new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.DELETE))); + new AlterConfig(DATALAKE_ENABLED.key(), null, AlterConfigOpType.DELETE))); // Set RocksDB shared rate limiter to 200MB/sec admin.alterClusterConfigs( @@ -46,6 +54,8 @@ The `AlterConfig` class contains three properties: * `value`: The configuration value to be set (e.g., `paimon`) * `opType`: The operation type, either `AlterConfigOpType.SET` or `AlterConfigOpType.DELETE` +If `datalake.enabled` is explicitly configured, `datalake.format` must also be configured in the cluster. + ### Using Flink Stored Procedures For certain configurations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Procedures](engine-flink/procedures.md#cluster-configuration-procedures) for detailed documentation on using `get_cluster_config` and `set_cluster_config` procedures. @@ -57,4 +67,4 @@ The connector options on a table including [Storage Options](engine-flink/option ```sql -- Enable lakehouse storage for the given table ALTER TABLE my_table SET ('table.datalake.enabled' = 'true'); -``` \ No newline at end of file +``` diff --git a/website/docs/maintenance/operations/upgrade-notes-0.10.md b/website/docs/maintenance/operations/upgrade-notes-0.10.md index 44820735fd..6f5970124a 100644 --- a/website/docs/maintenance/operations/upgrade-notes-0.10.md +++ b/website/docs/maintenance/operations/upgrade-notes-0.10.md @@ -4,3 +4,36 @@ sidebar_position: 4 --- # Upgrade Notes from v0.9 to v0.10 + +## New `datalake.enabled` Cluster Configuration + +Starting from v0.10, Fluss introduces the cluster-level configuration `datalake.enabled` to control whether the cluster is ready to create and manage lakehouse tables. + +### Behavior Changes + +- If `datalake.enabled` is unset, Fluss keeps the legacy behavior: configuring `datalake.format` alone also enables lakehouse tables. +- If `datalake.enabled = false`, Fluss only pre-binds the lake format for newly created tables and does not allow lakehouse tables yet. +- If `datalake.enabled = true`, Fluss fully enables lakehouse tables. +- If `datalake.enabled` is explicitly configured, `datalake.format` must also be configured. + +### Recommended Configuration + +If you want to enable lakehouse tables on the cluster, configure both options together: + +```yaml +datalake.enabled: true +datalake.format: paimon +``` + +If you only want to pre-bind the lake format without enabling lakehouse tables yet, configure: + +```yaml +datalake.enabled: false +datalake.format: paimon +``` + +### Documentation Updates for Existing Deployments + +If your existing deployment or internal scripts only set `datalake.format`, they will continue to work with the legacy behavior as long as `datalake.enabled` is left unset. + +However, for new configuration examples and operational guidance, prefer configuring `datalake.enabled` explicitly together with `datalake.format`. diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md b/website/docs/maintenance/tiered-storage/lakehouse-storage.md index 2b9e984f33..8f32eb64b8 100644 --- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md +++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md @@ -28,6 +28,7 @@ You can refer to the documentation of the corresponding data lake format integra First, you must configure the lakehouse storage in `server.yaml`. Take Paimon as an example, you must configure the following configurations: ```yaml # Paimon configuration +datalake.enabled: true datalake.format: paimon # the catalog config about Paimon, assuming using Filesystem catalog @@ -39,6 +40,7 @@ Fluss processes Paimon configurations by removing the `datalake.paimon.` prefix For example, if you want to configure to use Hive catalog, you can configure like following: ```yaml +datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: hive datalake.paimon.uri: thrift://: diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 6223c22d45..39240c61ae 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -111,6 +111,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -137,6 +138,7 @@ services: s3.secret-key: rustfsadmin s3.path.style.access: true kv.snapshot.interval: 0s + datalake.enabled: true datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -328,6 +330,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog @@ -362,6 +365,7 @@ services: s3.access-key: rustfsadmin s3.secret-key: rustfsadmin s3.path.style.access: true + datalake.enabled: true datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog From b0a754a05c5c6f6269a1b8523aaf3c95216c1fe7 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 25 Mar 2026 14:15:41 +0800 Subject: [PATCH 3/4] fix comments --- .../client/table/LakeEnableTableITCase.java | 45 +++++++++++++++++++ .../coordinator/LakeCatalogDynamicLoader.java | 23 +++++----- .../server/coordinator/MetadataManager.java | 12 ++++- .../utils/TableDescriptorValidation.java | 27 ++++++++++- 4 files changed, 93 insertions(+), 14 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java index f593e4af40..23bee18279 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.Schema; @@ -182,6 +183,50 @@ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception { assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue(); } + @Test + void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws Exception { + String databaseName = "test_db"; + String tableName = "test_table_format_mismatch"; + TablePath tablePath = TablePath.of(databaseName, tableName); + + admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); + admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig( + DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET))) + .get(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(3, "c1") + .property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.ICEBERG) + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig( + DATALAKE_FORMAT.key(), + DataLakeFormat.PAIMON.toString(), + AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET))) + .get(); + + List enableDatalakeChange = + Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true")); + assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("'table.datalake.format' ('iceberg')") + .hasMessageContaining("cluster 'datalake.format' ('paimon')"); + } + @Test void testEnableTableAfterClusterEnablesDataLake() throws Exception { String databaseName = "test_db"; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java index 1214498f9e..15221604f3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java @@ -67,15 +67,16 @@ public void validate(Configuration newConfig) throws ConfigException { ? newConfig.get(DATALAKE_FORMAT) : currentConfiguration.get(DATALAKE_FORMAT); + // TODO: validate(...) only sees the merged effective cluster config, so it cannot + // detect the case where a user enables datalake.enabled and unsets + // datalake.format in the same dynamic config change. This may leave the cluster + // with datalake.enabled set but no datalake.format. Fixing this likely requires + // extending the validate/reconfigure framework to expose the incremental change + // set, rather than only the merged result. We accept this for now because + // table-level enablement is still validated, and enabling datalake for a table + // will fail if datalake.format is not configured. boolean explicitDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED).orElse(false); - - Optional newDataLakeFormat = newConfig.getOptional(DATALAKE_FORMAT); - Optional effectiveDataLakeFormat = - newDataLakeFormat.isPresent() - ? newDataLakeFormat - : currentConfiguration.getOptional(DATALAKE_FORMAT); - - if (explicitDataLakeEnabled && newDatalakeFormat != null) { + if (explicitDataLakeEnabled && newDatalakeFormat == null) { throw new ConfigException( String.format( "'%s' must be configured when '%s' is explicitly set.", @@ -88,7 +89,7 @@ public void validate(Configuration newConfig) throws ConfigException { return; } - String datalakePrefix = "datalake." + effectiveDataLakeFormat.get() + "."; + String datalakePrefix = "datalake." + newDatalakeFormat + "."; Map configMap = newConfig.toMap(); configMap.forEach( (key, value) -> { @@ -99,7 +100,7 @@ public void validate(Configuration newConfig) throws ConfigException { throw new ConfigException( String.format( "Invalid configuration '%s' for '%s' datalake format", - key, effectiveDataLakeFormat.get())); + key, newDatalakeFormat)); } }); } @@ -190,7 +191,7 @@ public LakeCatalogContainer( static boolean isClusterDataLakeTableEnabled(Configuration configuration) { Optional explicitDataLakeEnabled = configuration.getOptional(DATALAKE_ENABLED); - // if datalake.enabled no set, use datalake.format for legacy cluster behavior + // if datalake.enabled not set, use datalake.format for legacy cluster behavior return explicitDataLakeEnabled.orElseGet( () -> configuration.getOptional(DATALAKE_FORMAT).isPresent()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index c9d5e4629e..aa9aa44c83 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -367,7 +367,10 @@ public long createTable( boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException { // validate table properties before creating table - validateTableDescriptor(tableToCreate, maxBucketNum); + validateTableDescriptor( + tableToCreate, + maxBucketNum, + lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()); if (!databaseExists(tablePath.getDatabaseName())) { throw new DatabaseNotExistException( @@ -514,7 +517,10 @@ public void alterTableProperties( if (newDescriptor != null) { // reuse the same validate logic with the createTable() method - validateTableDescriptor(newDescriptor, maxBucketNum); + validateTableDescriptor( + newDescriptor, + maxBucketNum, + lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()); // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( @@ -562,6 +568,8 @@ private void preAlterTableProperties( + tablePath + " in data lake, because the Fluss cluster doesn't enable datalake tables."); } + + // to enable lake table if (!isDataLakeEnabled(tableDescriptor)) { // before create table in fluss, we may create in lake try { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 14e4330d7c..6c7a99331a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -28,6 +28,7 @@ import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.ChangelogImage; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; @@ -41,6 +42,8 @@ import org.apache.fluss.utils.AutoPartitionStrategy; import org.apache.fluss.utils.StringUtils; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -79,7 +82,10 @@ public class TableDescriptorValidation { Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW); /** Validate table descriptor to create is valid and contain all necessary information. */ - public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) { + public static void validateTableDescriptor( + TableDescriptor tableDescriptor, + int maxBucketNum, + @Nullable DataLakeFormat clusterDataLakeFormat) { Schema schema = tableDescriptor.getSchema(); boolean hasPrimaryKey = schema.getPrimaryKey().isPresent(); Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties()); @@ -118,6 +124,25 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int checkTieredLog(tableConf); checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType()); checkSystemColumns(schema.getRowType()); + checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); + } + + private static void checkTableLakeFormatMatchesCluster( + Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { + Optional tableDataLakeFormat = + tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT); + if (clusterDataLakeFormat != null + && tableDataLakeFormat.isPresent() + && tableDataLakeFormat.get() != clusterDataLakeFormat) { + throw new InvalidConfigException( + String.format( + "'%s' ('%s') must match cluster '%s' ('%s') when '%s' is enabled.", + ConfigOptions.TABLE_DATALAKE_FORMAT.key(), + tableDataLakeFormat.get(), + ConfigOptions.DATALAKE_FORMAT.key(), + clusterDataLakeFormat, + ConfigOptions.TABLE_DATALAKE_ENABLED.key())); + } } public static void validateAlterTableProperties( From 94ebf66feefc80ecfbc3a3309fa13d06bf66cc79 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 25 Mar 2026 15:08:25 +0800 Subject: [PATCH 4/4] refine doc --- .../client/table/LakeEnableTableITCase.java | 13 +++++- .../coordinator/LakeCatalogDynamicLoader.java | 8 ++-- .../utils/TableDescriptorValidation.java | 12 ++++-- .../fluss/server/DynamicConfigChangeTest.java | 43 ++----------------- website/docs/maintenance/configuration.md | 7 +-- .../operations/updating-configs.md | 14 ++---- .../operations/upgrade-notes-0.10.md | 27 +++++++----- 7 files changed, 52 insertions(+), 72 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java index 23bee18279..82299806b0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java @@ -32,6 +32,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.DataTypes; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -47,6 +48,16 @@ /** IT case for lake enable table. */ class LakeEnableTableITCase extends ClientToServerITCaseBase { + @BeforeEach + void beforeEach() throws Exception { + admin.alterClusterConfigs( + Arrays.asList( + new AlterConfig(DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET))) + .get(); + } + @Test void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throws Exception { String databaseName = "test_db"; @@ -230,7 +241,7 @@ void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws Excep @Test void testEnableTableAfterClusterEnablesDataLake() throws Exception { String databaseName = "test_db"; - String tableName = "test_table_before_cluster_enable"; + String tableName = "test_enable_datalake_table"; TablePath tablePath = TablePath.of(databaseName, tableName); admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java index 15221604f3..93d1600087 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java @@ -75,11 +75,13 @@ public void validate(Configuration newConfig) throws ConfigException { // set, rather than only the merged result. We accept this for now because // table-level enablement is still validated, and enabling datalake for a table // will fail if datalake.format is not configured. - boolean explicitDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED).orElse(false); - if (explicitDataLakeEnabled && newDatalakeFormat == null) { + Optional optDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED); + if (optDataLakeEnabled.isPresent() + && optDataLakeEnabled.get() + && newDatalakeFormat == null) { throw new ConfigException( String.format( - "'%s' must be configured when '%s' is explicitly set.", + "'%s' must be configured when '%s' is explicitly set to true.", DATALAKE_FORMAT.key(), DATALAKE_ENABLED.key())); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 6c7a99331a..05390a5d3d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -129,11 +129,17 @@ public static void validateTableDescriptor( private static void checkTableLakeFormatMatchesCluster( Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { + if (clusterDataLakeFormat == null) { + return; + } + + if (!tableConf.get(ConfigOptions.TABLE_DATALAKE_ENABLED)) { + return; + } + Optional tableDataLakeFormat = tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT); - if (clusterDataLakeFormat != null - && tableDataLakeFormat.isPresent() - && tableDataLakeFormat.get() != clusterDataLakeFormat) { + if (tableDataLakeFormat.isPresent() && tableDataLakeFormat.get() != clusterDataLakeFormat) { throw new InvalidConfigException( String.format( "'%s' ('%s') must match cluster '%s' ('%s') when '%s' is enabled.", diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index 685966102f..3ef221ecab 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -531,7 +531,7 @@ public void reconfigure(Configuration newConfig) { } @Test - void testExplicitDatalakeEnabledRequiresFormat() throws Exception { + void testExplicitDataLakeEnabledRequiresDataLakeFormat() throws Exception { try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(new Configuration(), null, true)) { DynamicConfigManager dynamicConfigManager = @@ -545,48 +545,11 @@ void testExplicitDatalakeEnabledRequiresFormat() throws Exception { Collections.singletonList( new AlterConfig( DATALAKE_ENABLED.key(), - "false", + "true", AlterConfigOpType.SET)))) .isInstanceOf(ConfigException.class) .hasMessageContaining( - "'datalake.format' must be configured when 'datalake.enabled' is explicitly set."); - } - } - - @Test - void testPreBindOnlyModeDoesNotCreateLakeCatalog() throws Exception { - try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = - new LakeCatalogDynamicLoader(new Configuration(), null, true)) { - DynamicConfigManager dynamicConfigManager = - new DynamicConfigManager(zookeeperClient, new Configuration(), true); - dynamicConfigManager.register(lakeCatalogDynamicLoader); - dynamicConfigManager.startup(); - - dynamicConfigManager.alterConfigs( - Arrays.asList( - new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET), - new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET), - new AlterConfig( - "datalake.paimon.metastore", - "filesystem", - AlterConfigOpType.SET))); - - assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()) - .isEqualTo(PAIMON); - assertThat( - lakeCatalogDynamicLoader - .getLakeCatalogContainer() - .isClusterDataLakeTableEnabled()) - .isFalse(); - assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog()) - .isNull(); - assertThat( - lakeCatalogDynamicLoader - .getLakeCatalogContainer() - .getDefaultTableLakeOptions()) - .isEqualTo( - Collections.singletonMap( - "table.datalake.paimon.metastore", "filesystem")); + "'datalake.format' must be configured when 'datalake.enabled' is explicitly set to true."); } } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index f73b4e543d..c8688daba1 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -188,9 +188,10 @@ More metrics example could be found in [Observability - Metric Reporters](observ | metrics.reporter.prometheus-push.grouping-key | String | (None) | Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. | ## Lakehouse -| Option | Type | Default | Description | -|-----------------|------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| datalake.format | Enum | (None) | The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. | +| Option | Type | Default | Description | +|------------------|---------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| datalake.enabled | Boolean | (None) | Whether the Fluss cluster is ready to create and manage lakehouse tables. If unset, Fluss keeps the legacy behavior where configuring `datalake.format` also enables lakehouse tables. If set to `false`, Fluss pre-binds the lake format for newly created tables but does not allow lakehouse tables yet. If set to `true`, Fluss fully enables lakehouse tables. When this option is explicitly configured to true, `datalake.format` must also be configured. | +| datalake.format | Enum | (None) | The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi. | ## Kafka diff --git a/website/docs/maintenance/operations/updating-configs.md b/website/docs/maintenance/operations/updating-configs.md index 2cde6e7784..b44267a1d1 100644 --- a/website/docs/maintenance/operations/updating-configs.md +++ b/website/docs/maintenance/operations/updating-configs.md @@ -13,7 +13,7 @@ Fluss allows you to update cluster or table configurations dynamically without r From Fluss version 0.8 onwards, some of the server configs can be updated without restarting the server. Currently, the supported dynamically updatable server configurations include: -- `datalake.enabled`: Control whether the cluster is ready to create and manage lakehouse tables. When this option is explicitly configured, `datalake.format` must also be configured. +- `datalake.enabled`: Control whether the cluster is ready to create and manage lakehouse tables. When this option is explicitly configured to true, `datalake.format` must also be configured. - `datalake.format`: Specify the lakehouse format, e.g., `paimon`, `iceberg`. When enabling lakehouse storage explicitly, use it together with `datalake.enabled = true`. - Options with prefix `datalake.${datalake.format}` - `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and compaction write rate shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit the rate, or a very high value to effectively disable rate limiting. @@ -32,16 +32,10 @@ admin.alterClusterConfigs( new AlterConfig(DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET), new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); -// Pre-bind the lakehouse format without enabling lakehouse tables -admin.alterClusterConfigs( - Arrays.asList( - new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET), - new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); - -// Return to legacy behavior where configuring datalake.format alone also enables lakehouse tables +// Disable lakehouse storage admin.alterClusterConfigs( Collections.singletonList( - new AlterConfig(DATALAKE_ENABLED.key(), null, AlterConfigOpType.DELETE))); + new AlterConfig(DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET))); // Set RocksDB shared rate limiter to 200MB/sec admin.alterClusterConfigs( @@ -54,8 +48,6 @@ The `AlterConfig` class contains three properties: * `value`: The configuration value to be set (e.g., `paimon`) * `opType`: The operation type, either `AlterConfigOpType.SET` or `AlterConfigOpType.DELETE` -If `datalake.enabled` is explicitly configured, `datalake.format` must also be configured in the cluster. - ### Using Flink Stored Procedures For certain configurations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Procedures](engine-flink/procedures.md#cluster-configuration-procedures) for detailed documentation on using `get_cluster_config` and `set_cluster_config` procedures. diff --git a/website/docs/maintenance/operations/upgrade-notes-0.10.md b/website/docs/maintenance/operations/upgrade-notes-0.10.md index 6f5970124a..4681bcd5ff 100644 --- a/website/docs/maintenance/operations/upgrade-notes-0.10.md +++ b/website/docs/maintenance/operations/upgrade-notes-0.10.md @@ -5,35 +5,40 @@ sidebar_position: 4 # Upgrade Notes from v0.9 to v0.10 -## New `datalake.enabled` Cluster Configuration +## Cluster Configuration Changes -Starting from v0.10, Fluss introduces the cluster-level configuration `datalake.enabled` to control whether the cluster is ready to create and manage lakehouse tables. +### New `datalake.enabled` Cluster Configuration -### Behavior Changes +Starting in v0.10, Fluss introduces the cluster-level configuration `datalake.enabled` to control whether the cluster is ready to create and manage lakehouse tables. -- If `datalake.enabled` is unset, Fluss keeps the legacy behavior: configuring `datalake.format` alone also enables lakehouse tables. -- If `datalake.enabled = false`, Fluss only pre-binds the lake format for newly created tables and does not allow lakehouse tables yet. +#### Behavior Changes + +- If `datalake.enabled` is unset, Fluss preserves the legacy behavior: configuring `datalake.format` alone also enables lakehouse tables. +- If `datalake.enabled = false`, Fluss pre-binds the lake format for newly created tables but does not allow lakehouse tables yet. - If `datalake.enabled = true`, Fluss fully enables lakehouse tables. - If `datalake.enabled` is explicitly configured, `datalake.format` must also be configured. -### Recommended Configuration +#### Recommended Configuration -If you want to enable lakehouse tables on the cluster, configure both options together: +To enable lakehouse tables for the cluster, configure both options together: ```yaml datalake.enabled: true datalake.format: paimon ``` -If you only want to pre-bind the lake format without enabling lakehouse tables yet, configure: +To pre-bind the lake format without enabling lakehouse tables yet, configure: ```yaml datalake.enabled: false datalake.format: paimon ``` -### Documentation Updates for Existing Deployments +This mode is useful when you want newly created tables to carry the lake format in advance, while postponing lakehouse enablement at the cluster level. +After `datalake.enabled` is later set to `true`, tables created under this configuration can still turn on `table.datalake.enabled` without being recreated. + +#### Notes for Existing Deployments -If your existing deployment or internal scripts only set `datalake.format`, they will continue to work with the legacy behavior as long as `datalake.enabled` is left unset. +If your existing deployment or internal scripts only set `datalake.format`, they will continue to work with the legacy behavior as long as `datalake.enabled` remains unset. -However, for new configuration examples and operational guidance, prefer configuring `datalake.enabled` explicitly together with `datalake.format`. +For new configuration examples and operational guidance, we recommend explicitly configuring `datalake.enabled` together with `datalake.format`.