From b4cb5aa6125fc6b7a4f2dc007ccdcc23db54b8d6 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 20 May 2026 13:43:09 +0200 Subject: [PATCH] DataLake setting 'skip_non_iceberg_tables' Partially with AI assistance (tests, unity implementation) --- docs/en/engines/database-engines/datalake.md | 33 ++--- src/Databases/DataLake/DatabaseDataLake.cpp | 7 +- .../DataLake/DatabaseDataLakeSettings.cpp | 1 + src/Databases/DataLake/GlueCatalog.cpp | 14 ++- src/Databases/DataLake/GlueCatalog.h | 4 +- src/Databases/DataLake/UnityCatalog.cpp | 26 +++- src/Databases/DataLake/UnityCatalog.h | 5 +- tests/integration/test_database_delta/test.py | 117 ++++++++++++++++++ tests/integration/test_database_glue/test.py | 103 +++++++++++++++ 9 files changed, 287 insertions(+), 23 deletions(-) diff --git a/docs/en/engines/database-engines/datalake.md b/docs/en/engines/database-engines/datalake.md index 9c20511172b3..959076147f56 100644 --- a/docs/en/engines/database-engines/datalake.md +++ b/docs/en/engines/database-engines/datalake.md @@ -46,22 +46,23 @@ catalog_type, The following settings are supported: -| Setting | Description | -|-------------------------|-----------------------------------------------------------------------------------------------| -| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) | -| `warehouse` | The warehouse/database name to use in the catalog. | -| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | -| `auth_header` | Custom HTTP header for authentication with the catalog service | -| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | -| `storage_endpoint` | Endpoint URL for the underlying storage | -| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | -| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) | -| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | -| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | -| `region` | AWS region for the service (e.g., `us-east-1`) | -| `dlf_access_key_id` | Access key ID for DLF access | -| `dlf_access_key_secret` | Access key Secret for DLF access | -| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` | +| Setting | Description | +|---------------------------|-----------------------------------------------------------------------------------------------| +| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) | +| `warehouse` | The warehouse/database name to use in the catalog. | +| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) | +| `auth_header` | Custom HTTP header for authentication with the catalog service | +| `auth_scope` | OAuth2 scope for authentication (if using OAuth) | +| `storage_endpoint` | Endpoint URL for the underlying storage | +| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | +| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) | +| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | +| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | +| `region` | AWS region for the service (e.g., `us-east-1`) | +| `dlf_access_key_id` | Access key ID for DLF access | +| `dlf_access_key_secret` | Access key Secret for DLF access | +| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` | +| `skip_non_iceberg_tables` | Skip non Iceberg tables in catalog, implemented for `glue` and `unity` types. | ## Examples {#examples} diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index ad4adcdb8432..b165278990e0 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -82,6 +82,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString google_adc_quota_project_id; extern const DatabaseDataLakeSettingsString google_adc_credentials_file; extern const DatabaseDataLakeSettingsBool polaris_style_paths; + extern const DatabaseDataLakeSettingsBool skip_non_iceberg_tables; } namespace Setting @@ -241,7 +242,8 @@ std::shared_ptr DatabaseDataLake::getCatalog() const url, settings[DatabaseDataLakeSetting::catalog_credential].value, settings[DatabaseDataLakeSetting::namespaces].value, - Context::getGlobalContextInstance()); + Context::getGlobalContextInstance(), + settings[DatabaseDataLakeSetting::skip_non_iceberg_tables].value); break; } @@ -251,7 +253,8 @@ std::shared_ptr DatabaseDataLake::getCatalog() const url, Context::getGlobalContextInstance(), catalog_parameters, - table_engine_definition); + table_engine_definition, + settings[DatabaseDataLakeSetting::skip_non_iceberg_tables].value); break; } case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE: diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index c65e0f18cea7..54da0cd4f884 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -46,6 +46,7 @@ namespace ErrorCodes DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \ DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \ DECLARE(Bool, polaris_style_paths, true, "Enable Polaris/ADLS Gen2 path convention: the container name is prepended to the path in ABFSS locations (e.g. abfss://c@account/c/actual/path). When enabled, the redundant container prefix is stripped when building Azure HTTPS URLs. Disable if a real directory inside the container has the same name as the container itself.", 0) \ + DECLARE(Bool, skip_non_iceberg_tables, false, "Skip non iceberg tables in catalog", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 4d8bccf19fe8..6d038b255b8a 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -89,13 +89,15 @@ GlueCatalog::GlueCatalog( const String & endpoint, DB::ContextPtr context_, const CatalogSettings & settings_, - DB::ASTPtr table_engine_definition_) + DB::ASTPtr table_engine_definition_, + bool skip_non_iceberg_tables_) : ICatalog("") , DB::WithContext(context_) , log(getLogger("GlueCatalog(" + settings_.region + ")")) , region(settings_.region) , settings(settings_) , table_engine_definition(table_engine_definition_) + , skip_non_iceberg_tables(skip_non_iceberg_tables_) , metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024) { DB::S3::CredentialsConfiguration creds_config; @@ -242,6 +244,16 @@ DB::Names GlueCatalog::getTablesForDatabase(const std::string & db_name, size_t if (table.GetStorageDescriptor().GetColumns().empty()) continue; + if (skip_non_iceberg_tables) + { + std::string table_type; + if (table.GetParameters().contains("table_type")) + table_type = table.GetParameters().at("table_type"); + + if (Poco::toUpper(table_type) != "ICEBERG") + continue; + } + if (limit != 0 && result.size() >= limit) break; result.push_back(db_name + "." + table.GetName()); diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 6208bb284195..231b986c828c 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -33,7 +33,8 @@ class GlueCatalog final : public ICatalog, private DB::WithContext const String & endpoint, DB::ContextPtr context_, const CatalogSettings & settings_, - DB::ASTPtr table_engine_definition_); + DB::ASTPtr table_engine_definition_, + bool skip_non_iceberg_tables_); ~GlueCatalog() override; @@ -81,6 +82,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext CatalogSettings settings; DB::ASTPtr table_engine_definition; std::unordered_set allowed_namespaces; + bool skip_non_iceberg_tables = false; bool isNamespaceAllowed(const std::string & namespace_) const; diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp index 05709fd14c89..092624fb6c78 100644 --- a/src/Databases/DataLake/UnityCatalog.cpp +++ b/src/Databases/DataLake/UnityCatalog.cpp @@ -37,6 +37,23 @@ static const auto TEMPORARY_CREDENTIALS_ENDPOINT = "temporary-table-credentials" static const std::unordered_set READABLE_TABLES = {"TABLE_DELTA", "TABLE_DELTA_EXTERNAL"}; static const auto READABLE_DATA_SOURCE_FORMAT = "DELTA"; +bool isReadableDeltaTable(const Poco::JSON::Object::Ptr & table_json) +{ + bool has_securable_kind = hasValueAndItsNotNone("securable_kind", table_json); + bool has_data_source_format = hasValueAndItsNotNone("data_source_format", table_json); + + if (has_securable_kind && !READABLE_TABLES.contains(table_json->get("securable_kind").extract())) + return false; + + if (has_data_source_format && table_json->get("data_source_format").extract() != READABLE_DATA_SOURCE_FORMAT) + return false; + + if (!has_data_source_format && !has_securable_kind) + return false; + + return true; +} + struct UnityCatalogFullSchemaName { std::string catalog_name; @@ -206,7 +223,7 @@ bool UnityCatalog::tryGetTableMetadata( { result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has unsupported data_source_format '{}'. " \ "It means that it's unreadable with Unity catalog in ClickHouse, readable tables must have data_source_format == '{}'", - full_table_name, object->get("securable_kind").extract(), READABLE_DATA_SOURCE_FORMAT)); + full_table_name, object->get("data_source_format").extract(), READABLE_DATA_SOURCE_FORMAT)); } if (!has_data_source_format && !has_securable_kind) @@ -340,6 +357,9 @@ DB::Names UnityCatalog::getTablesForSchema(const std::string & schema, size_t li const auto current_table_json = tables_object->get(static_cast(i)).extract(); const auto table_name = current_table_json->get("name").extract(); + if (skip_non_iceberg_tables && !isReadableDeltaTable(current_table_json)) + continue; + tables.push_back(schema + "." + table_name); if (limit && tables.size() >= limit) break; @@ -445,12 +465,14 @@ UnityCatalog::UnityCatalog( const std::string & base_url_, const std::string & catalog_credential_, const std::string & namespaces_, - DB::ContextPtr context_) + DB::ContextPtr context_, + bool skip_non_iceberg_tables_) : ICatalog(catalog_) , DB::WithContext(context_) , base_url(base_url_) , log(getLogger("UnityCatalog(" + catalog_ + ")")) , auth_header("Authorization", "Bearer " + catalog_credential_) + , skip_non_iceberg_tables(skip_non_iceberg_tables_) { boost::split(allowed_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on); } diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h index a230b4233467..28b02d248060 100644 --- a/src/Databases/DataLake/UnityCatalog.h +++ b/src/Databases/DataLake/UnityCatalog.h @@ -22,7 +22,8 @@ class UnityCatalog final : public ICatalog, private DB::WithContext const std::string & base_url_, const std::string & catalog_credential_, const std::string & namespaces_, - DB::ContextPtr context_); + DB::ContextPtr context_, + bool skip_non_iceberg_tables_); ~UnityCatalog() override = default; @@ -77,6 +78,8 @@ class UnityCatalog final : public ICatalog, private DB::WithContext TableMetadata & result) const; ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override; + + bool skip_non_iceberg_tables = false; }; } diff --git a/tests/integration/test_database_delta/test.py b/tests/integration/test_database_delta/test.py index 1b5e73090ead..0fe7ab4a2575 100644 --- a/tests/integration/test_database_delta/test.py +++ b/tests/integration/test_database_delta/test.py @@ -25,6 +25,8 @@ CATALOG_NAME = "unity_catalog_test_db" +UC_LOG = "/var/lib/clickhouse/user_files/unitycatalog/uc.log" +UNITY_CATALOG_API = "http://localhost:8080/api/2.1/unity-catalog" def start_unity_catalog(node): @@ -142,6 +144,51 @@ def execute_multiple_spark_queries(node, queries_list): return execute_spark_query(node, ";".join(queries_list)) +def create_unity_text_table_via_rest(node, schema_name, table_name, storage_location): + """Register a non-Delta TEXT table in Unity Catalog via REST API.""" + payload = { + "name": table_name, + "catalog_name": "unity", + "schema_name": schema_name, + "table_type": "EXTERNAL", + "data_source_format": "TEXT", + "storage_location": storage_location, + "columns": [ + { + "name": "id", + "type_text": "bigint", + "type_json": '"bigint"', + "type_name": "LONG", + "position": 0, + "nullable": "True", + }, + { + "name": "text", + "type_text": "string", + "type_json": '"string"', + "type_name": "STRING", + "position": 1, + "nullable": "True", + }, + ], + } + script = f""" +import json +import urllib.request + +payload = {json.dumps(payload)} +req = urllib.request.Request( + "{UNITY_CATALOG_API}/tables", + data=json.dumps(payload).encode(), + headers={{"Content-Type": "application/json"}}, + method="POST", +) +with urllib.request.urlopen(req) as resp: + resp.read() +""" + node.exec_in_container(["python3", "-c", script]) + + @pytest.mark.parametrize("use_delta_kernel", ["1", "0"]) def test_embedded_database_and_tables(started_cluster, use_delta_kernel): test_uuid = str(uuid.uuid4()).replace("-", "_") @@ -710,3 +757,73 @@ def create_namespace(suffix): assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n" assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`") + + +def test_non_delta_text_table(started_cluster): + """TEXT table in Unity Catalog (non-Delta) is skipped from SHOW TABLES when setting is enabled.""" + node1 = started_cluster.instances["node1"] + + test_uuid = str(uuid.uuid4()).replace("-", "_") + schema_name = f"non_delta_{test_uuid}" + text_table_name = f"text_{test_uuid}" + delta_table_name = f"delta_{test_uuid}" + db_name = f"db_{test_uuid}" + + text_location = f"file:///var/lib/clickhouse/user_files/tmp/{schema_name}/{text_table_name}" + delta_location = f"/var/lib/clickhouse/user_files/tmp/{schema_name}/{delta_table_name}" + + node1.exec_in_container( + ["bash", "-c", f"mkdir -p /var/lib/clickhouse/user_files/tmp/{schema_name}/{text_table_name}"], + nothrow=True, + ) + + execute_multiple_spark_queries( + node1, + [ + f"CREATE SCHEMA {schema_name}", + f"CREATE TABLE {schema_name}.{delta_table_name} (id INT, value DOUBLE) USING DELTA LOCATION '{delta_location}'", + f"INSERT INTO {schema_name}.{delta_table_name} VALUES (1, 1.5)", + ], + ) + create_unity_text_table_via_rest(node1, schema_name, text_table_name, text_location) + + node1.query( + f"""CREATE DATABASE {db_name} ENGINE = DataLakeCatalog('{UNITY_CATALOG_API}') +SETTINGS warehouse = 'unity', catalog_type = 'unity', vended_credentials = false, skip_non_iceberg_tables = true""", + settings={"allow_experimental_database_unity_catalog": "1"}, + ) + + tables = node1.query( + f"SHOW TABLES FROM {db_name} LIKE '{schema_name}.%'", + settings={"use_hive_partitioning": "0"}, + ).strip().split("\n") + assert f"{schema_name}.{text_table_name}" not in tables + assert f"{schema_name}.{delta_table_name}" in tables + + tables = node1.query(f"SELECT name FROM system.tables WHERE database = '{db_name}' SETTINGS show_data_lake_catalogs_in_system_tables=1").strip().split("\n") + assert f"{schema_name}.{text_table_name}" not in tables + assert f"{schema_name}.{delta_table_name}" in tables + + # Even though the TEXT table is hidden from SHOW TABLES, it can still be accessed directly by name, and shows correct schema. + show_create_text = node1.query( + f"SHOW CREATE TABLE {db_name}.`{schema_name}.{text_table_name}`" + ) + assert "ENGINE = Other" in show_create_text + + show_create_delta = node1.query( + f"SHOW CREATE TABLE {db_name}.`{schema_name}.{delta_table_name}`" + ) + assert "ENGINE = DeltaLake" in show_create_delta + + select_error = node1.query_and_get_error( + f"SELECT * FROM {db_name}.`{schema_name}.{text_table_name}`" + ) + assert "unsupported data_source_format" in select_error + assert "DELTA" in select_error + + assert node1.query( + f"SELECT id, value FROM {db_name}.`{schema_name}.{delta_table_name}`", + settings={"use_hive_partitioning": "0"}, + ).strip() == "1\t1.5" + + node1.query(f"DROP DATABASE IF EXISTS {db_name} SYNC") diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index 4738a3ac6977..f1078c826840 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -49,6 +49,42 @@ def run_s3_mocks(started_cluster, args=[]): BASE_URL = "http://glue:3000" BASE_URL_LOCAL_HOST = "http://localhost:3000" + +def get_glue_client(started_cluster): + return boto3.client( + "glue", region_name="us-east-1", endpoint_url=BASE_URL_LOCAL_HOST + ) + + +def create_hive_text_table(glue_client, database_name, table_name, columns, location): + """Register a non-Iceberg Hive external text table in Glue (boto3 API). + + Spark SQL equivalent: + CREATE TABLE db.table (id BIGINT, text STRING) USING text ... + """ + glue_client.create_table( + DatabaseName=database_name, + TableInput={ + "Name": table_name, + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "EXTERNAL": "TRUE", + "spark.sql.sources.provider": "text", + }, + "StorageDescriptor": { + "Columns": [{"Name": name, "Type": typ} for name, typ in columns], + "Location": location, + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "SerdeInfo": { + "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "Parameters": {"field.delim": "\t"}, + }, + }, + }, + ) + + def generate_decimal(precision=9, scale=2): max_value = 10**(precision - scale) - 1 value = random.uniform(0, max_value) @@ -894,3 +930,70 @@ def test_sts_smoke(started_cluster): # Cleanup node.query(f"DROP DATABASE IF EXISTS {db_name_fail} SYNC") node.query(f"DROP DATABASE IF EXISTS {db_name_success} SYNC") + + +def test_non_iceberg_hive_text_table(started_cluster): + """Hive text table in Glue (no table_type=ICEBERG) is visible but not readable.""" + node = started_cluster.instances["node1"] + + test_ref = f"test_non_iceberg_{uuid.uuid4().hex}" + root_namespace = f"{test_ref}_ns" + hive_table_name = f"{test_ref}_hive_text" + iceberg_table_name = f"{test_ref}_iceberg" + db_name = f"db_{test_ref}" + + glue_client = get_glue_client(started_cluster) + glue_client.create_database(DatabaseInput={"Name": root_namespace}) + + hive_location = f"s3://warehouse-glue/{root_namespace}/{hive_table_name}/" + create_hive_text_table( + glue_client, + root_namespace, + hive_table_name, + [("id", "bigint"), ("text", "string")], + hive_location, + ) + + catalog = load_catalog_impl(started_cluster) + iceberg_table = create_table( + catalog, + root_namespace, + iceberg_table_name, + dir=f"{root_namespace}/{iceberg_table_name}", + ) + iceberg_table.append(generate_arrow_data(1)) + + create_clickhouse_glue_database(started_cluster, node, db_name, additional_settings={"skip_non_iceberg_tables": True}) + + tables = node.query(f"SHOW TABLES FROM {db_name}").strip().split("\n") + assert f"{root_namespace}.{hive_table_name}" not in tables + assert f"{root_namespace}.{iceberg_table_name}" in tables + + tables = node.query(f"SELECT name FROM system.tables WHERE database = '{db_name}' SETTINGS show_data_lake_catalogs_in_system_tables=1").strip().split("\n") + assert f"{root_namespace}.{hive_table_name}" not in tables + assert f"{root_namespace}.{iceberg_table_name}" in tables + + # Direct query to non-iceberg table should work and show it's an Other engine, but select should fail + show_create_hive = node.query( + f"SHOW CREATE TABLE {db_name}.`{root_namespace}.{hive_table_name}`" + ) + assert "ENGINE = Other" in show_create_hive + assert "`id` Int64" in show_create_hive + assert "`text` String" in show_create_hive + + show_create_iceberg = node.query( + f"SHOW CREATE TABLE {db_name}.`{root_namespace}.{iceberg_table_name}`" + ) + assert "ENGINE = Iceberg" in show_create_iceberg + + select_error = node.query_and_get_error( + f"SELECT * FROM {db_name}.`{root_namespace}.{hive_table_name}`" + ) + assert "no table_type" in select_error + assert "ICEBERG" in select_error + + assert int( + node.query(f"SELECT count() FROM {db_name}.`{root_namespace}.{iceberg_table_name}`") + ) == 1 + + node.query(f"DROP DATABASE IF EXISTS {db_name} SYNC")