Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,23 @@ catalog_type,

The following settings are supported:

| Setting | Description |
|-------------------------|-----------------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` |
| Setting | Description |
|---------------------------|-----------------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` |
| `skip_non_iceberg_tables` | Skip non Iceberg tables in catalog, implemented for `glue` and `unity` types. |

## Examples {#examples}

Expand Down
7 changes: 5 additions & 2 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString google_adc_quota_project_id;
extern const DatabaseDataLakeSettingsString google_adc_credentials_file;
extern const DatabaseDataLakeSettingsBool polaris_style_paths;
extern const DatabaseDataLakeSettingsBool skip_non_iceberg_tables;
}

namespace Setting
Expand Down Expand Up @@ -241,7 +242,8 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
url,
settings[DatabaseDataLakeSetting::catalog_credential].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
Context::getGlobalContextInstance(),
settings[DatabaseDataLakeSetting::skip_non_iceberg_tables].value);
break;
}

Expand All @@ -251,7 +253,8 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
url,
Context::getGlobalContextInstance(),
catalog_parameters,
table_engine_definition);
table_engine_definition,
settings[DatabaseDataLakeSetting::skip_non_iceberg_tables].value);
break;
}
case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE:
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace ErrorCodes
DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \
DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \
DECLARE(Bool, polaris_style_paths, true, "Enable Polaris/ADLS Gen2 path convention: the container name is prepended to the path in ABFSS locations (e.g. abfss://c@account/c/actual/path). When enabled, the redundant container prefix is stripped when building Azure HTTPS URLs. Disable if a real directory inside the container has the same name as the container itself.", 0) \
DECLARE(Bool, skip_non_iceberg_tables, false, "Skip non iceberg tables in catalog", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \
Expand Down
14 changes: 13 additions & 1 deletion src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ GlueCatalog::GlueCatalog(
const String & endpoint,
DB::ContextPtr context_,
const CatalogSettings & settings_,
DB::ASTPtr table_engine_definition_)
DB::ASTPtr table_engine_definition_,
bool skip_non_iceberg_tables_)
: ICatalog("")
, DB::WithContext(context_)
, log(getLogger("GlueCatalog(" + settings_.region + ")"))
, region(settings_.region)
, settings(settings_)
, table_engine_definition(table_engine_definition_)
, skip_non_iceberg_tables(skip_non_iceberg_tables_)
, metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024)
{
DB::S3::CredentialsConfiguration creds_config;
Expand Down Expand Up @@ -242,6 +244,16 @@ DB::Names GlueCatalog::getTablesForDatabase(const std::string & db_name, size_t
if (table.GetStorageDescriptor().GetColumns().empty())
continue;

if (skip_non_iceberg_tables)
{
std::string table_type;
if (table.GetParameters().contains("table_type"))
table_type = table.GetParameters().at("table_type");

if (Poco::toUpper(table_type) != "ICEBERG")
continue;
}

if (limit != 0 && result.size() >= limit)
break;
result.push_back(db_name + "." + table.GetName());
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
const String & endpoint,
DB::ContextPtr context_,
const CatalogSettings & settings_,
DB::ASTPtr table_engine_definition_);
DB::ASTPtr table_engine_definition_,
bool skip_non_iceberg_tables_);

~GlueCatalog() override;

Expand Down Expand Up @@ -81,6 +82,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
CatalogSettings settings;
DB::ASTPtr table_engine_definition;
std::unordered_set<std::string> allowed_namespaces;
bool skip_non_iceberg_tables = false;

bool isNamespaceAllowed(const std::string & namespace_) const;

Expand Down
26 changes: 24 additions & 2 deletions src/Databases/DataLake/UnityCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ static const auto TEMPORARY_CREDENTIALS_ENDPOINT = "temporary-table-credentials"
static const std::unordered_set<std::string> READABLE_TABLES = {"TABLE_DELTA", "TABLE_DELTA_EXTERNAL"};
static const auto READABLE_DATA_SOURCE_FORMAT = "DELTA";

bool isReadableDeltaTable(const Poco::JSON::Object::Ptr & table_json)
{
bool has_securable_kind = hasValueAndItsNotNone("securable_kind", table_json);
bool has_data_source_format = hasValueAndItsNotNone("data_source_format", table_json);

if (has_securable_kind && !READABLE_TABLES.contains(table_json->get("securable_kind").extract<String>()))
return false;

if (has_data_source_format && table_json->get("data_source_format").extract<String>() != READABLE_DATA_SOURCE_FORMAT)
return false;

if (!has_data_source_format && !has_securable_kind)
return false;

return true;
}

struct UnityCatalogFullSchemaName
{
std::string catalog_name;
Expand Down Expand Up @@ -206,7 +223,7 @@ bool UnityCatalog::tryGetTableMetadata(
{
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has unsupported data_source_format '{}'. " \
"It means that it's unreadable with Unity catalog in ClickHouse, readable tables must have data_source_format == '{}'",
full_table_name, object->get("securable_kind").extract<String>(), READABLE_DATA_SOURCE_FORMAT));
full_table_name, object->get("data_source_format").extract<String>(), READABLE_DATA_SOURCE_FORMAT));
}

if (!has_data_source_format && !has_securable_kind)
Expand Down Expand Up @@ -340,6 +357,9 @@ DB::Names UnityCatalog::getTablesForSchema(const std::string & schema, size_t li
const auto current_table_json = tables_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
const auto table_name = current_table_json->get("name").extract<String>();

if (skip_non_iceberg_tables && !isReadableDeltaTable(current_table_json))
continue;

tables.push_back(schema + "." + table_name);
if (limit && tables.size() >= limit)
break;
Expand Down Expand Up @@ -445,12 +465,14 @@ UnityCatalog::UnityCatalog(
const std::string & base_url_,
const std::string & catalog_credential_,
const std::string & namespaces_,
DB::ContextPtr context_)
DB::ContextPtr context_,
bool skip_non_iceberg_tables_)
: ICatalog(catalog_)
, DB::WithContext(context_)
, base_url(base_url_)
, log(getLogger("UnityCatalog(" + catalog_ + ")"))
, auth_header("Authorization", "Bearer " + catalog_credential_)
, skip_non_iceberg_tables(skip_non_iceberg_tables_)
{
boost::split(allowed_namespaces, namespaces_, boost::is_any_of(", "), boost::token_compress_on);
}
Expand Down
5 changes: 4 additions & 1 deletion src/Databases/DataLake/UnityCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class UnityCatalog final : public ICatalog, private DB::WithContext
const std::string & base_url_,
const std::string & catalog_credential_,
const std::string & namespaces_,
DB::ContextPtr context_);
DB::ContextPtr context_,
bool skip_non_iceberg_tables_);

~UnityCatalog() override = default;

Expand Down Expand Up @@ -77,6 +78,8 @@ class UnityCatalog final : public ICatalog, private DB::WithContext
TableMetadata & result) const;

ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override;

bool skip_non_iceberg_tables = false;
};

}
Expand Down
117 changes: 117 additions & 0 deletions tests/integration/test_database_delta/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@


CATALOG_NAME = "unity_catalog_test_db"
UC_LOG = "/var/lib/clickhouse/user_files/unitycatalog/uc.log"
UNITY_CATALOG_API = "http://localhost:8080/api/2.1/unity-catalog"


def start_unity_catalog(node):
Expand Down Expand Up @@ -142,6 +144,51 @@ def execute_multiple_spark_queries(node, queries_list):
return execute_spark_query(node, ";".join(queries_list))


def create_unity_text_table_via_rest(node, schema_name, table_name, storage_location):
"""Register a non-Delta TEXT table in Unity Catalog via REST API."""
payload = {
"name": table_name,
"catalog_name": "unity",
"schema_name": schema_name,
"table_type": "EXTERNAL",
"data_source_format": "TEXT",
"storage_location": storage_location,
"columns": [
{
"name": "id",
"type_text": "bigint",
"type_json": '"bigint"',
"type_name": "LONG",
"position": 0,
"nullable": "True",
},
{
"name": "text",
"type_text": "string",
"type_json": '"string"',
"type_name": "STRING",
"position": 1,
"nullable": "True",
},
],
}
script = f"""
import json
import urllib.request

payload = {json.dumps(payload)}
req = urllib.request.Request(
"{UNITY_CATALOG_API}/tables",
data=json.dumps(payload).encode(),
headers={{"Content-Type": "application/json"}},
method="POST",
)
with urllib.request.urlopen(req) as resp:
resp.read()
"""
node.exec_in_container(["python3", "-c", script])


@pytest.mark.parametrize("use_delta_kernel", ["1", "0"])
def test_embedded_database_and_tables(started_cluster, use_delta_kernel):
test_uuid = str(uuid.uuid4()).replace("-", "_")
Expand Down Expand Up @@ -710,3 +757,73 @@ def create_namespace(suffix):

assert node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}alpha.{table_name}`") == "0\n"
assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"SELECT count() FROM {CATALOG_NAME}.`{namespace_prefix}bravo.{table_name}`")


def test_non_delta_text_table(started_cluster):
"""TEXT table in Unity Catalog (non-Delta) is skipped from SHOW TABLES when setting is enabled."""
node1 = started_cluster.instances["node1"]

test_uuid = str(uuid.uuid4()).replace("-", "_")
schema_name = f"non_delta_{test_uuid}"
text_table_name = f"text_{test_uuid}"
delta_table_name = f"delta_{test_uuid}"
db_name = f"db_{test_uuid}"

text_location = f"file:///var/lib/clickhouse/user_files/tmp/{schema_name}/{text_table_name}"
delta_location = f"/var/lib/clickhouse/user_files/tmp/{schema_name}/{delta_table_name}"

node1.exec_in_container(
["bash", "-c", f"mkdir -p /var/lib/clickhouse/user_files/tmp/{schema_name}/{text_table_name}"],
nothrow=True,
)

execute_multiple_spark_queries(
node1,
[
f"CREATE SCHEMA {schema_name}",
f"CREATE TABLE {schema_name}.{delta_table_name} (id INT, value DOUBLE) USING DELTA LOCATION '{delta_location}'",
f"INSERT INTO {schema_name}.{delta_table_name} VALUES (1, 1.5)",
],
)
create_unity_text_table_via_rest(node1, schema_name, text_table_name, text_location)

node1.query(
f"""CREATE DATABASE {db_name} ENGINE = DataLakeCatalog('{UNITY_CATALOG_API}')
SETTINGS warehouse = 'unity', catalog_type = 'unity', vended_credentials = false, skip_non_iceberg_tables = true""",
settings={"allow_experimental_database_unity_catalog": "1"},
)

tables = node1.query(
f"SHOW TABLES FROM {db_name} LIKE '{schema_name}.%'",
settings={"use_hive_partitioning": "0"},
).strip().split("\n")
assert f"{schema_name}.{text_table_name}" not in tables
assert f"{schema_name}.{delta_table_name}" in tables

tables = node1.query(f"SELECT name FROM system.tables WHERE database = '{db_name}' SETTINGS show_data_lake_catalogs_in_system_tables=1").strip().split("\n")
assert f"{schema_name}.{text_table_name}" not in tables
assert f"{schema_name}.{delta_table_name}" in tables

# Even though the TEXT table is hidden from SHOW TABLES, it can still be accessed directly by name, and shows correct schema.
show_create_text = node1.query(
f"SHOW CREATE TABLE {db_name}.`{schema_name}.{text_table_name}`"
)
assert "ENGINE = Other" in show_create_text

show_create_delta = node1.query(
f"SHOW CREATE TABLE {db_name}.`{schema_name}.{delta_table_name}`"
)
assert "ENGINE = DeltaLake" in show_create_delta

select_error = node1.query_and_get_error(
f"SELECT * FROM {db_name}.`{schema_name}.{text_table_name}`"
)
assert "unsupported data_source_format" in select_error
assert "DELTA" in select_error

assert node1.query(
f"SELECT id, value FROM {db_name}.`{schema_name}.{delta_table_name}`",
settings={"use_hive_partitioning": "0"},
).strip() == "1\t1.5"

node1.query(f"DROP DATABASE IF EXISTS {db_name} SYNC")
Loading
Loading