Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.Schema;
Expand All @@ -31,11 +32,14 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.fluss.config.ConfigOptions.DATALAKE_ENABLED;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -44,6 +48,16 @@
/** IT case for lake enable table. */
class LakeEnableTableITCase extends ClientToServerITCaseBase {

@BeforeEach
void beforeEach() throws Exception {
admin.alterClusterConfigs(
Arrays.asList(
new AlterConfig(DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET),
new AlterConfig(
DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET)))
.get();
}

@Test
void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throws Exception {
String databaseName = "test_db";
Expand Down Expand Up @@ -86,7 +100,9 @@ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throw

// Enable datalake format for the cluster
admin.alterClusterConfigs(
Collections.singletonList(
Arrays.asList(
new AlterConfig(
DATALAKE_ENABLED.key(), null, AlterConfigOpType.SET),
new AlterConfig(
DATALAKE_FORMAT.key(),
DataLakeFormat.PAIMON.toString(),
Expand Down Expand Up @@ -177,4 +193,142 @@ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception {
TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
}

@Test
void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws Exception {
String databaseName = "test_db";
String tableName = "test_table_format_mismatch";
TablePath tablePath = TablePath.of(databaseName, tableName);

admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
.get();

TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.build())
.distributedBy(3, "c1")
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.ICEBERG)
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

admin.alterClusterConfigs(
Arrays.asList(
new AlterConfig(
DATALAKE_FORMAT.key(),
DataLakeFormat.PAIMON.toString(),
AlterConfigOpType.SET),
new AlterConfig(
DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET)))
.get();

List<TableChange> enableDatalakeChange =
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining("'table.datalake.format' ('iceberg')")
.hasMessageContaining("cluster 'datalake.format' ('paimon')");
}

@Test
void testEnableTableAfterClusterEnablesDataLake() throws Exception {
String databaseName = "test_db";
String tableName = "test_enable_datalake_table";
TablePath tablePath = TablePath.of(databaseName, tableName);

admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
admin.alterClusterConfigs(
Arrays.asList(
new AlterConfig(
DATALAKE_FORMAT.key(),
DataLakeFormat.PAIMON.toString(),
AlterConfigOpType.SET),
new AlterConfig(
DATALAKE_ENABLED.key(), "false", AlterConfigOpType.SET)))
.get();

TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.build())
.distributedBy(3, "c1")
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON);
assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse();

List<TableChange> enableDatalakeChange =
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get())
.cause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessageContaining("doesn't enable datalake tables");

admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET)))
.get();
admin.alterTable(tablePath, enableDatalakeChange, false).get();

TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat())
.hasValue(DataLakeFormat.PAIMON);
}

@Test
void testLegacyClusterCanStillEnableTableLevelDatalake() throws Exception {
String databaseName = "test_db_legacy_enable";
String tableName = "test_table_legacy_enable";
TablePath tablePath = TablePath.of(databaseName, tableName);

admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
admin.alterClusterConfigs(
Collections.singletonList(
// not set DATALAKE_ENABLED to mock legacy cluster
new AlterConfig(
DATALAKE_FORMAT.key(),
DataLakeFormat.PAIMON.toString(),
AlterConfigOpType.SET)))
.get();

TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.build())
.distributedBy(3, "c1")
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getDataLakeFormat()).hasValue(DataLakeFormat.PAIMON);
assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse();

// make sure we can still enable datalake for the table
List<TableChange> enableDatalakeChange =
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
admin.alterTable(tablePath, enableDatalakeChange, false).get();

TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat())
.hasValue(DataLakeFormat.PAIMON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1993,12 +1993,24 @@ public class ConfigOptions {
// ------------------------------------------------------------------------
// ConfigOptions for lakehouse storage
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> DATALAKE_ENABLED =
key("datalake.enabled")
.booleanType()
.noDefaultValue()
.withDescription(
"Whether the Fluss cluster is ready to create and manage lakehouse tables. "
+ "If unset, Fluss keeps the legacy behavior where configuring `datalake.format` "
+ "also enables lakehouse tables. If set to `false`, Fluss pre-binds the lake format "
+ "for newly created tables but does not allow lakehouse tables yet. If set to `true`, "
+ "Fluss fully enables lakehouse tables. When this option is explicitly configured, "
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datalake.enabled description says that when the option is "explicitly configured", datalake.format must also be configured, but the server-side validation/enforcement currently only requires datalake.format when datalake.enabled is explicitly set to true. Please align the wording with the actual semantics (e.g., specify “explicitly set to true”) to avoid misleading operators.

Suggested change
+ "Fluss fully enables lakehouse tables. When this option is explicitly configured, "
+ "Fluss fully enables lakehouse tables. When this option is explicitly set to `true`, "

Copilot uses AI. Check for mistakes.
+ "`datalake.format` must also be configured.");

public static final ConfigOption<DataLakeFormat> DATALAKE_FORMAT =
key("datalake.format")
.enumType(DataLakeFormat.class)
.noDefaultValue()
.withDescription(
"The datalake format used by of Fluss to be as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
"The datalake format used by Fluss as lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. "
+ "In the future, more kinds of data lake format will be supported, such as DeltaLake or Hudi.");

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
validateTableCreationPermission(tableDescriptor, tablePath);

// apply system defaults if the config is not set
tableDescriptor =
applySystemDefaults(tableDescriptor, lakeCatalogContainer.getDataLakeFormat());
tableDescriptor = applySystemDefaults(tableDescriptor, lakeCatalogContainer);

// the distribution and bucket count must be set now
//noinspection OptionalGetWithoutIsPresent
Expand Down Expand Up @@ -560,7 +559,10 @@ public static TablePropertyChanges toTablePropertyChanges(List<TableChange> tabl
}

private TableDescriptor applySystemDefaults(
TableDescriptor tableDescriptor, DataLakeFormat dataLakeFormat) {
TableDescriptor tableDescriptor,
LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer) {
DataLakeFormat dataLakeFormat = lakeCatalogContainer.getDataLakeFormat();
boolean clusterDataLakeTableEnabled = lakeCatalogContainer.isClusterDataLakeTableEnabled();
TableDescriptor newDescriptor = tableDescriptor;

// not set bucket num
Expand Down Expand Up @@ -594,7 +596,7 @@ private TableDescriptor applySystemDefaults(

// lake table can only be enabled when the cluster configures datalake format
boolean dataLakeEnabled = isDataLakeEnabled(tableDescriptor);
if (dataLakeEnabled && dataLakeFormat == null) {
if (dataLakeEnabled && !clusterDataLakeTableEnabled) {
throw new InvalidTableException(
String.format(
"'%s' is enabled for the table, but the Fluss cluster doesn't enable datalake tables.",
Expand Down
Loading
Loading