From fde486d90aa92b6d8ccc50f9bab36ce09c9966ab Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Wed, 20 May 2026 22:28:37 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/ClickHouse/ClickHouse/pull/90740 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adapted PR #90740 (Read iceberg from various paths) to antalya-26.3 without applying the prerequisite upstream PR #100420 (IcebergPath / path_resolver refactor). The refactor is dropped; raw `String` paths are used instead. Adaptations from PR 90740 to antalya-26.3: - `IcebergPathFromMetadata` references → plain `String` (no `.serialize()`, no `IcebergPathFromMetadata::deserialize` wrapping). - `IcebergPathResolver & path_resolver` parameters → `const String & table_location`. Calls like `path_resolver.resolve(x)` become `x`. - `SecondaryStorages` infrastructure kept: thread-safe map of secondary object storages plus a `resolveObjectStorageForPath` helper that maps a metadata path to a (storage, key) pair. The IcebergPath-aware overload of `resolveObjectStorageForPath` was removed. - New protocol version `DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7` used in `IcebergObjectSerializableInfo::{serializeForClusterFunctionProtocol, deserializeForClusterFunctionProtocol}` to gate the new `data_object_file_metadata_path` field and `requires_external_storage` check; `_path` for delete files goes through `SchemeAuthorityKey` on older protocols. Dropped (depend on upstream commits not on antalya-26.3): - `ExpireSnapshotsExecute.{cpp,h}`, `RemoveOrphanFilesExecute.{cpp,h}`, `SnapshotFilesTraversal.{cpp,h}` — extracted EXECUTE handlers from upstream PR introducing per-command refactor. PR 90740 only threads `secondary_storages` into these; the underlying refactor is a separate dependency. The antalya-26.3 `Iceberg::expireSnapshots` path is kept unchanged in `IcebergMetadata::executeCommand`. - `executeExpireSnapshots` / `executeRemoveOrphanFiles` dispatch in `IcebergMetadata::executeCommand` — depends on the dropped files. References: - Upstream PR: https://github.com/ClickHouse/ClickHouse/pull/90740 - antalya-26.1 backport (used as a structural reference for the no-IcebergPath adaptation): 0520e2ee3b9 ("Allow to read iceberg table data from any location") Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Core/ProtocolDefines.h | 3 +- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 21 + src/Databases/DataLake/DatabaseDataLake.cpp | 4 +- src/IO/S3/URI.cpp | 14 +- src/IO/S3/URI.h | 3 +- src/Interpreters/ClusterFunctionReadTask.cpp | 6 +- src/Interpreters/IcebergMetadataLog.cpp | 6 +- .../DataLakes/IDataLakeMetadata.h | 3 + .../DataLakes/Iceberg/Compaction.cpp | 103 ++- .../DataLakes/Iceberg/Compaction.h | 2 + .../Iceberg/IcebergDataObjectInfo.cpp | 58 +- .../DataLakes/Iceberg/IcebergDataObjectInfo.h | 36 +- .../DataLakes/Iceberg/IcebergIterator.cpp | 60 +- .../DataLakes/Iceberg/IcebergIterator.h | 11 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 29 +- .../DataLakes/Iceberg/IcebergMetadata.h | 4 + .../DataLakes/Iceberg/IcebergWrites.cpp | 1 + .../Iceberg/ManifestFileIterator.cpp | 1 + .../DataLakes/Iceberg/Mutations.h | 1 + .../Iceberg/PositionDeleteTransform.cpp | 14 +- .../Iceberg/PositionDeleteTransform.h | 21 +- .../Iceberg/StatelessMetadataFileGetter.cpp | 30 +- .../Iceberg/StatelessMetadataFileGetter.h | 10 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 30 +- .../ObjectStorage/DataLakes/Iceberg/Utils.h | 18 +- .../StorageObjectStorageSource.cpp | 96 ++- .../StorageObjectStorageSource.h | 6 + ...rageObjectStorageStableTaskDistributor.cpp | 8 + src/Storages/ObjectStorage/Utils.cpp | 585 ++++++++++++++++++ src/Storages/ObjectStorage/Utils.h | 40 ++ src/Storages/StorageURL.cpp | 6 +- .../__init__.py | 0 .../configs/config.d/cluster.xml | 20 + .../configs/config.d/named_collections.xml | 15 + .../configs/config.d/query_log.xml | 6 + .../configs/users.d/users.xml | 9 + .../test_storage_iceberg_multistorage/test.py | 426 +++++++++++++ .../test_array_evolved_with_struct.py | 2 +- .../metadata/v1.metadata.json | 4 +- .../metadata/v1.metadata.json | 4 +- .../metadata/v1.metadata.json | 4 +- 42 files changed, 1607 insertions(+), 116 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_multistorage/__init__.py create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml create mode 100644 tests/integration/test_storage_iceberg_multistorage/test.py diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index ae7cff5d0e38..0c670bcc3985 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -39,7 +39,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_META static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH; static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index b292429b836f..d0d73eb643f9 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -591,6 +591,9 @@ Use multiple threads for azure multipart upload. )", 0) \ DECLARE(Bool, s3_throw_on_zero_files_match, false, R"( Throw an error, when ListObjects request cannot match any files +)", 0) \ + DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"( +Credentials from the base storage are always propagated to secondary object storages when endpoints match. When this setting is enabled, credentials are also propagated when endpoints differ, including less secure connections (for example, from `https` to plain `http`). )", 0) \ DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"( Throw an error if matched zero files according to glob expansion rules. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9520753d7864..f130ef7f15c6 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -42,6 +42,27 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya", { {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, + {"optimize_dictget_tuple_element", false, true, "Rewrite tupleElement(dictGet(..., tuple_of_attrs, ...), N) into a single-attribute dictGet call."}, + {"parallel_replicas_prefer_local_replica", true, true, "New setting. When disabled, replicas for parallel reading are selected purely by the load balancing algorithm without forcing the local replica into the set."}, + {"predicate_statistics_sample_rate", 0, 0, "New setting to collect predicate selectivity statistics into system.predicate_statistics_log"}, + {"allow_experimental_geo_types_in_iceberg", false, false, "New setting to allow parsing Iceberg geometry/geography fields as Geometry type."}, + {"output_format_parquet_use_custom_encoder", true, true, "Obsolete setting, the custom encoder is now always used."}, + {"output_format_parquet_version", "2.latest", "2.latest", "Obsolete setting, the custom encoder always writes Parquet V2.6+."}, + {"output_format_parquet_compliant_nested_types", true, true, "Obsolete setting, the custom encoder always uses compliant nested types."}, + {"output_format_parquet_unsupported_types_as_binary", false, false, "Obsolete setting, the native writer always throws UNKNOWN_TYPE for unsupported types."}, + {"input_format_parquet_use_native_reader_v3", true, true, "Obsolete setting, the native reader v3 is now always used."}, + {"max_bytes_ratio_before_external_join", 0., 0., "New setting: ratio of available memory used as the spill threshold for hash joins. Combined with the absolute `max_bytes_before_external_join` (the smaller of the two applies)."}, + {"allow_key_condition_coalesce_rewrite", false, true, "New setting to rewrite predicates of the form `coalesce(a_1, ..., a_N) const` (and equivalently `ifNull`, or with the constant on the left) into a disjunction before index analysis, so per-column primary key and skip indexes on each `a_i` can be used. Partial-constant forms such as `coalesce(a, 42, b)` and `coalesce(a, b, 42)` are also handled."}, + {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, + {"url_base", "", "", "New setting to specify the base URL for resolving relative URLs in the url table function and URL table engine."}, + {"max_threads_min_free_memory_per_thread", 0, 1073741824, "New setting to limit the number of threads based on available free memory"}, + {"max_insert_threads_min_free_memory_per_thread", 0, 4294967296, "New setting to limit the number of insert threads based on available free memory"}, + {"enable_blob_storage_log_for_read_operations", false, false, "New setting to log blob storage read operations to system.blob_storage_log"}, + {"max_streams_for_union_step", 0, 0, "New setting to limit the number of simultaneously active data streams in a UNION step to reduce peak memory usage."}, + {"max_streams_for_union_step_to_max_threads_ratio", 0, 8, "New setting: the limit on simultaneously active streams in a UNION step is computed as min(max_streams_for_union_step, max_threads * max_streams_for_union_step_to_max_threads_ratio), either being 0 disables that input."}, + {"send_table_structure_on_insert_with_inline_data", true, true, "New setting to control whether server sends table structure for INSERT queries with inline data."}, + {"use_top_k_dynamic_filtering_for_variable_length_types", true, false, "Disable `use_top_k_dynamic_filtering` for variable-length sort columns (e.g. `String`) by default; the previous behavior had the optimization apply unconditionally and is preserved under `compatibility`."}, + {"page_cache_max_coalesced_bytes", 16777216, 16777216, "New setting to bound the size of a single coalesced read used to populate the userspace page cache on cache miss."}, }); addSettingsChanges(settings_changes_history, "26.3", { diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index ad4adcdb8432..d21c75e28ff8 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -575,7 +575,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con LOG_DEBUG(log, "Has no credentials"); } } - else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()) + else if (!lightweight && table_metadata.requiresCredentials() + && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end() + && table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local) { throw Exception( ErrorCodes::BAD_ARGUMENTS, diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 5ab8e5cfd724..7a6bedfc2a76 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -17,10 +17,12 @@ namespace DB struct URIConverter { - static void modifyURI(Poco::URI & uri, std::unordered_map mapper) + static void modifyURI(Poco::URI & uri, std::unordered_map mapper, bool enable_url_encoding = true) { Macros macros({{"bucket", uri.getHost()}}); - uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery()); + uri = macros.expand(mapper[uri.getScheme()]).empty() + ? uri + : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding); } }; @@ -32,7 +34,7 @@ namespace ErrorCodes namespace S3 { -URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters) +URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters, bool enable_url_encoding) { /// Case when bucket name represented in domain name of S3 URL. /// E.g. (https://bucket-name.s3.region.amazonaws.com/key) @@ -54,9 +56,9 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre else uri_str = uri_; - uri = Poco::URI(uri_str); + uri = Poco::URI(uri_str, enable_url_encoding); /// Keep a copy of how Poco parsed the original string before any mapping - Poco::URI original_uri(uri_str); + Poco::URI original_uri(uri_str, enable_url_encoding); bool looks_like_presigned = false; for (const auto & [qk, qv] : original_uri.getQueryParameters()) { @@ -101,7 +103,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre } if (!mapper.empty()) - URIConverter::modifyURI(uri, mapper); + URIConverter::modifyURI(uri, mapper, enable_url_encoding); } storage_name = "S3"; diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index fd45baa39774..4524d4eac946 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -39,7 +39,8 @@ struct URI explicit URI( const std::string & uri_, bool allow_archive_path_syntax = false, - bool keep_presigned_query_parameters = true); + bool keep_presigned_query_parameters = true, + bool enable_url_encoding = true); void addRegionToURI(const std::string & region); static void validateBucket(const std::string & bucket, const Poco::URI & uri); diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index fdacbda9270b..3989a86c5b4d 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o data_lake_metadata = object->data_lake_metadata.value(); #if USE_AVRO - if (std::dynamic_pointer_cast(object)) - { - iceberg_info = dynamic_cast(*object).info; - } + if (auto iceberg_object = std::dynamic_pointer_cast(object)) + iceberg_info = iceberg_object->info; #endif file_meta_info = object->relative_path_with_metadata.file_meta_info; diff --git a/src/Interpreters/IcebergMetadataLog.cpp b/src/Interpreters/IcebergMetadataLog.cpp index c0552c14e0c2..da147fcd7bfe 100644 --- a/src/Interpreters/IcebergMetadataLog.cpp +++ b/src/Interpreters/IcebergMetadataLog.cpp @@ -102,12 +102,16 @@ void insertRowToLogTable( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg metadata log table is not configured"); } + String normalized_table_path = table_path; + while (normalized_table_path.size() > 1 && normalized_table_path.back() == '/') + normalized_table_path.pop_back(); + iceberg_metadata_log->add( DB::IcebergMetadataLogElement{ .current_time = spec.tv_sec, .query_id = local_context->getCurrentQueryId(), .content_type = row_log_level, - .table_path = table_path, + .table_path = normalized_table_path, .file_path = file_path, .metadata_content = get_row(), .row_in_file = row_in_file, diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index de35e5d4c4e1..7159eed757dc 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -122,6 +122,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// Returns the full table location URI (e.g. `s3a://bucket/prefix/table/`) + virtual std::string getTableLocation() const { return {}; } + /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index a41006d83afa..0b79f2df8c60 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -72,6 +73,9 @@ struct Plan std::unordered_map> manifest_list_to_manifest_files; std::unordered_map>> snapshot_id_to_data_files; std::unordered_map> path_to_data_file; + /// Raw paths of every file referenced by the snapshots being compacted, used at cleanup + /// time to also remove files that live outside the base object_storage. + std::unordered_set referenced_file_paths; FileNamesGenerator generator; Poco::JSON::Object::Ptr initial_metadata_object; @@ -113,6 +117,7 @@ Plan getPlan( const DataLakeStorageSettings & data_lake_settings, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, const String & write_format, ContextPtr context, CompressionMethod compression_method) @@ -155,14 +160,16 @@ Plan getPlan( std::unordered_map> manifest_files; for (const auto & snapshot : snapshots_info) { - auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log); + plan.referenced_file_paths.insert(snapshot.manifest_list_path); + auto manifest_list = getManifestList(object_storage, persistent_table_components, context, snapshot.manifest_list_path, log, secondary_storages); for (const auto & manifest_file : manifest_list) { plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path); if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path)) plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id; + plan.referenced_file_paths.insert(manifest_file.manifest_file_path); auto files_handle = getManifestFileEntriesHandle( - object_storage, persistent_table_components, context, log, manifest_file, static_cast(current_schema_id)); + object_storage, persistent_table_components, context, log, manifest_file, static_cast(current_schema_id), secondary_storages); if (!manifest_files.contains(manifest_file.manifest_file_path)) { @@ -171,27 +178,39 @@ Plan getPlan( } manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path); for (const auto & pos_delete_file : files_handle.getFilesWithoutDeleted(FileContentType::POSITION_DELETE)) + { all_positional_delete_files.push_back(pos_delete_file); + plan.referenced_file_paths.insert(pos_delete_file->parsed_entry->file_path_key); + } for (const auto & data_file : files_handle.getFilesWithoutDeleted(FileContentType::DATA)) { + plan.referenced_file_paths.insert(data_file->parsed_entry->file_path_key); auto partition_index = plan.partition_encoder.encodePartition(data_file->parsed_entry->partition_key_value); if (plan.partitions.size() <= partition_index) plan.partitions.push_back({}); - IcebergDataObjectInfoPtr data_object_info = std::make_shared(data_file, 0); + const auto & raw_metadata_path = data_file->parsed_entry->file_path_key; + auto [resolved_storage, resolved_key] = resolveObjectStorageForPath( + persistent_table_components.table_location, + raw_metadata_path, object_storage, secondary_storages, context); + + IcebergDataObjectInfoPtr data_object_info = std::make_shared( + data_file, raw_metadata_path, 0, resolved_storage, resolved_key); std::shared_ptr data_file_ptr; - if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path)) + std::string path_identifier + = resolved_storage->getDescription() + '\0' + resolved_storage->getObjectsNamespace() + '\0' + resolved_key; + if (!plan.path_to_data_file.contains(path_identifier)) { data_file_ptr = std::make_shared(DataFilePlan{ .data_object_info = data_object_info, .manifest_list = manifest_files[manifest_file.manifest_file_path], .patched_path = plan.generator.generateDataFileName()}); - plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr; + plan.path_to_data_file[path_identifier] = data_file_ptr; } else { - data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path]; + data_file_ptr = plan.path_to_data_file[path_identifier]; } plan.partitions[partition_index].push_back(data_file_ptr); plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back()); @@ -224,7 +243,9 @@ static void writeDataFiles( const std::optional & format_settings, ContextPtr context, const String & write_format, - CompressionMethod write_compression_method) + CompressionMethod write_compression_method, + const String & table_location, + std::shared_ptr secondary_storages) { for (auto & [_, data_file] : initial_plan.path_to_data_file) { @@ -235,10 +256,15 @@ static void writeDataFiles( format_settings, // todo make compaction using same FormatParserSharedResources std::make_shared(context->getSettingsRef(), 1), - context); + context, + table_location, + secondary_storages); - RelativePathWithMetadata relative_path(data_file->data_object_info->getPath()); - auto read_buffer = createReadBuffer(relative_path, object_storage, context, getLogger("IcebergCompaction")); + ObjectStoragePtr storage_to_use = data_file->data_object_info->getResolvedStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + RelativePathWithMetadata object_info(data_file->data_object_info->getPath()); + auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction")); const Settings & settings = context->getSettingsRef(); auto parser_shared_resources = std::make_shared( @@ -392,6 +418,8 @@ void writeMetadataFiles( { manifest_entry->patched_path = plan.generator.generateManifestEntryName(); manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata; + + std::vector manifest_data_filenames(data_filenames.begin(), data_filenames.end()); auto buffer_manifest_entry = object_storage->writeObject( StoredObject(manifest_entry->patched_path.path_in_storage), WriteMode::Rewrite, @@ -409,7 +437,7 @@ void writeMetadataFiles( partition_columns, plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]), ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(), - std::vector(data_filenames.begin(), data_filenames.end()), + manifest_data_filenames, manifest_entry->statistics, sample_block_, snapshot, @@ -485,22 +513,46 @@ void writeMetadataFiles( } } -std::vector getOldFiles(ObjectStoragePtr object_storage, const String & table_path) +/// Files to delete after compaction: a base-storage directory listing under `metadata/` and +/// `data/` (covers historical metadata.json and any orphan files on the base storage), plus +/// any paths from the compacted snapshots that resolve to a secondary storage. +std::vector> getOldFiles( + ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, + ContextPtr context, + const PersistentTableComponents & persistent_table_components, + const Plan & plan) { - auto metadata_files = listFiles(*object_storage, table_path, "metadata", ""); - auto data_files = listFiles(*object_storage, table_path, "data", ""); + std::vector> result; + + for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "metadata", "")) + result.emplace_back(object_storage, std::move(file)); + for (auto && file : listFiles(*object_storage, persistent_table_components.table_path, "data", "")) + result.emplace_back(object_storage, std::move(file)); + + for (const auto & raw_path : plan.referenced_file_paths) + { + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, + raw_path, + object_storage, + secondary_storages, + context); - for (auto && data_file : data_files) - metadata_files.push_back(data_file); + if (storage_to_use.get() != object_storage.get()) + result.emplace_back(std::move(storage_to_use), std::move(key_in_storage)); + } - return metadata_files; + return result; } -void clearOldFiles(ObjectStoragePtr object_storage, const std::vector & old_files) +void clearOldFiles(const std::vector> & old_files) { - for (const auto & metadata_file : old_files) + auto log = getLogger("IcebergCompaction"); + for (const auto & [storage, key] : old_files) { - object_storage->removeObjectIfExists(StoredObject(metadata_file)); + LOG_DEBUG(log, "Removing old file during compaction: storage={}, key={}", storage->getDescription(), key); + storage->removeObjectIfExists(StoredObject(key)); } } @@ -508,6 +560,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage_, + std::shared_ptr secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, SharedHeader sample_block_, @@ -519,12 +572,14 @@ void compactIcebergTable( data_lake_settings, persistent_table_components, object_storage_, + *secondary_storages_, write_format, context_, persistent_table_components.metadata_compression_method); if (plan.need_optimize) { - auto old_files = getOldFiles(object_storage_, persistent_table_components.table_path); + auto old_files = getOldFiles( + object_storage_, *secondary_storages_, context_, persistent_table_components, plan); writeDataFiles( plan, sample_block_, @@ -532,9 +587,11 @@ void compactIcebergTable( format_settings_, context_, write_format, - persistent_table_components.metadata_compression_method); + persistent_table_components.metadata_compression_method, + persistent_table_components.table_location, + secondary_storages_); writeMetadataFiles(plan, object_storage_, context_, sample_block_, write_format, persistent_table_components.table_path); - clearOldFiles(object_storage_, old_files); + clearOldFiles(old_files); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h index 0916002f99f3..302bc3d30e69 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB::Iceberg @@ -15,6 +16,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, DB::ObjectStoragePtr object_storage_, + std::shared_ptr secondary_storages_, const DataLakeStorageSettings & data_lake_settings, const std::optional & format_settings_, DB::SharedHeader sample_block_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp index 7046483523d5..2cb0281778f7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp @@ -1,3 +1,4 @@ +#include #include "config.h" #include @@ -9,15 +10,18 @@ #include #include +#include #include #include #include +#include namespace DB::ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int UNKNOWN_PROTOCOL; +extern const int PROTOCOL_VERSION_MISMATCH; } @@ -34,10 +38,15 @@ extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes; #if USE_AVRO IcebergDataObjectInfo::IcebergDataObjectInfo( - Iceberg::ProcessedManifestFileEntryPtr data_manifest_file_entry_, Int32 schema_id_relevant_to_iterator_) - : ObjectInfo(RelativePathWithMetadata(data_manifest_file_entry_->file_path)) + Iceberg::ProcessedManifestFileEntryPtr data_manifest_file_entry_, + const String & metadata_path_, + Int32 schema_id_relevant_to_iterator_, + ObjectStoragePtr resolved_storage_, + const String & resolved_key_) + : ObjectInfo(RelativePathWithMetadata(resolved_key_.empty() ? metadata_path_ : resolved_key_)) , info{ data_manifest_file_entry_->parsed_entry->file_path_key, + metadata_path_, data_manifest_file_entry_->resolved_schema_id, schema_id_relevant_to_iterator_, data_manifest_file_entry_->sequence_number, @@ -46,7 +55,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo( /* equality_deletes_objects */ {}, data_manifest_file_entry_->parsed_entry->record_count, data_manifest_file_entry_->parsed_entry->file_size_in_bytes} + , resolved_storage(std::move(resolved_storage_)) { + /// resolved_storage and resolved_key must be provided together or neither must be provided + /// (default-constructed, meaning the path has not been resolved yet). + chassert(resolved_key_.empty() == (resolved_storage == nullptr)); } IcebergDataObjectInfo::IcebergDataObjectInfo(const RelativePathWithMetadata & path_) @@ -59,13 +72,15 @@ std::shared_ptr IcebergDataObjectInfo::getPositionDeleteTransf const SharedHeader & header, const std::optional & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_) + ContextPtr context_, + const String & table_location, + std::shared_ptr secondary_storages) { IcebergDataObjectInfoPtr self = shared_from_this(); if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value) - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages); else - return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_); + return std::make_shared(header, self, object_storage, format_settings, parser_shared_resources, context_, table_location, secondary_storages); } void IcebergDataObjectInfo::addPositionDeleteObject(Iceberg::ProcessedManifestFileEntryPtr position_delete_object) @@ -95,7 +110,30 @@ void IcebergDataObjectInfo::addEqualityDeleteObject(const Iceberg::ProcessedMani void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const { checkVersion(protocol_version); + + if (requires_external_storage && protocol_version < DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + throw Exception( + ErrorCodes::PROTOCOL_VERSION_MISMATCH, + "Iceberg data file '{}' is outside of the table location, " + "worker needs to have protocol version >= {}, but has {}. ", + data_object_file_metadata_path, + DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH, + protocol_version); + } + + auto path_for_protocol = [&](const String & path) -> String + { + if (protocol_version < DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + return SchemeAuthorityKey(path).key; + return path; + }; + writeStringBinary(data_object_file_path_key, out); + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + writeStringBinary(data_object_file_metadata_path, out); + } writeVarInt(underlying_format_read_schema_id, out); writeVarInt(schema_id_relevant_to_iterator, out); writeVarInt(sequence_number, out); @@ -104,12 +142,12 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf writeVarUInt(position_deletes_objects.size(), out); for (const auto & pos_delete_obj : position_deletes_objects) { - writeStringBinary(pos_delete_obj.file_path, out); + writeStringBinary(path_for_protocol(pos_delete_obj.file_path), out); writeStringBinary(pos_delete_obj.file_format, out); if (pos_delete_obj.reference_data_file_path.has_value()) { writeVarUInt(1, out); - writeStringBinary(pos_delete_obj.reference_data_file_path.value(), out); + writeStringBinary(path_for_protocol(pos_delete_obj.reference_data_file_path.value()), out); } else { @@ -121,7 +159,7 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf writeVarUInt(equality_deletes_objects.size(), out); for (const auto & eq_delete_obj : equality_deletes_objects) { - writeStringBinary(eq_delete_obj.file_path, out); + writeStringBinary(path_for_protocol(eq_delete_obj.file_path), out); writeStringBinary(eq_delete_obj.file_format, out); writeVarInt(eq_delete_obj.schema_id, out); if (eq_delete_obj.equality_ids.has_value()) @@ -166,6 +204,10 @@ void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBu { checkVersion(protocol_version); readStringBinary(data_object_file_path_key, in); + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH) + { + readStringBinary(data_object_file_metadata_path, in); + } readVarInt(underlying_format_read_schema_id, in); readVarInt(schema_id_relevant_to_iterator, in); readVarInt(sequence_number, in); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h index 75c1bb9fba3d..ad9dc8356c05 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h @@ -16,6 +16,10 @@ namespace DB::Iceberg struct IcebergObjectSerializableInfo { String data_object_file_path_key; + /// Raw path string as written in the Iceberg manifest, preserved as-is (may be a full URI like + /// `s3://bucket/...` or a relative path). Used for the `_path` virtual column and as a stable + /// task identifier. + String data_object_file_metadata_path; Int32 underlying_format_read_schema_id; Int32 schema_id_relevant_to_iterator; Int64 sequence_number; @@ -25,6 +29,9 @@ struct IcebergObjectSerializableInfo std::optional record_count; std::optional file_size_in_bytes; + /// Set to true by the coordinator when the file is outside of the table location + bool requires_external_storage = false; + void serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const; void deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version); @@ -37,6 +44,7 @@ struct IcebergObjectSerializableInfo #if USE_AVRO #include +#include #include @@ -49,7 +57,14 @@ struct IcebergDataObjectInfo : public ObjectInfo, std::enable_shared_from_this & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, - ContextPtr context_); + ContextPtr context_, + const String & table_location, + std::shared_ptr secondary_storages); std::optional getFileFormat() const override { return info.file_format; } void addPositionDeleteObject(Iceberg::ProcessedManifestFileEntryPtr position_delete_object); + std::optional getMetadataPath() const + { + if (info.data_object_file_metadata_path.empty()) + return std::nullopt; + return info.data_object_file_metadata_path; + } + + ObjectStoragePtr getResolvedStorage() const { return resolved_storage; } + + void setResolvedStorage(ObjectStoragePtr storage) { resolved_storage = std::move(storage); } + void addEqualityDeleteObject(const Iceberg::ProcessedManifestFileEntryPtr & equality_delete_object); Iceberg::IcebergObjectSerializableInfo info; + +private: + /// For files located in a different storage than the table's main storage + ObjectStoragePtr resolved_storage; }; using IcebergDataObjectInfoPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 97de3771264e..f1994c5a2b38 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -183,7 +184,8 @@ std::optional SingleThreadIcebergKeysIterator::ne local_context, log, mle.manifest_file_path, - mle.manifest_file_byte_size); + mle.manifest_file_byte_size, + *secondary_storages); current_manifest_file_iterator = Iceberg::ManifestFileIterator::create( manifest_file_cacheable_part.deserializer, @@ -213,7 +215,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( const ActionsDAG * filter_dag_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : object_storage(object_storage_) , filter_dag( [&]() -> std::shared_ptr @@ -236,6 +239,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( , data_snapshot(data_snapshot_) , persistent_components(persistent_components_) , log(getLogger("IcebergIterator")) + , secondary_storages(secondary_storages_) , manifest_file_content_type(manifest_file_content_type_) { } @@ -247,10 +251,12 @@ IcebergIterator::IcebergIterator( IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : logger(getLogger("IcebergIterator")) , filter_dag(filter_dag_ ? std::make_shared(filter_dag_->clone()) : nullptr) , object_storage(std::move(object_storage_)) + , local_context(local_context_) , table_state_snapshot(table_snapshot_) , persistent_components(persistent_components_) , data_files_iterator( @@ -260,7 +266,8 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , deletes_iterator( object_storage, local_context_, @@ -268,11 +275,13 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , blocking_queue(100) , producer_task(std::nullopt) , callback(std::move(callback_)) , table_schema_id(table_snapshot_->schema_id) + , secondary_storages(secondary_storages_) { auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) @@ -331,8 +340,17 @@ ObjectInfoPtr IcebergIterator::next(size_t) Iceberg::ProcessedManifestFileEntryPtr manifest_file_entry; if (blocking_queue.pop(manifest_file_entry)) { - IcebergDataObjectInfoPtr object_info - = std::make_shared(manifest_file_entry, table_state_snapshot->schema_id); + const auto & raw_metadata_path = manifest_file_entry->parsed_entry->file_path_key; + auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( + persistent_components.table_location, raw_metadata_path, + object_storage, *secondary_storages, local_context); + + IcebergDataObjectInfoPtr object_info = std::make_shared( + manifest_file_entry, raw_metadata_path, table_state_snapshot->schema_id, storage_to_use, resolved_key); + + object_info->info.requires_external_storage = (storage_to_use != object_storage); + + for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, /* is_equality_delete */ false, logger)) { @@ -400,6 +418,34 @@ ObjectInfoPtr IcebergIterator::next(size_t) manifest_file_entry->resolved_schema_id, /// file's schema id to interpret value_bounds bytes manifest_file_entry->parsed_entry->columns_infos, manifest_file_entry->parsed_entry->value_bounds)); + + if (!object_info->info.requires_external_storage) + { + for (const auto & pos_del : object_info->info.position_deletes_objects) + { + auto [del_storage, del_key] = resolveObjectStorageForPath( + persistent_components.table_location, pos_del.file_path, object_storage, *secondary_storages, local_context); + if (del_storage != object_storage) + { + object_info->info.requires_external_storage = true; + break; + } + } + } + if (!object_info->info.requires_external_storage) + { + for (const auto & eq_del : object_info->info.equality_deletes_objects) + { + auto [del_storage, del_key] = resolveObjectStorageForPath( + persistent_components.table_location, eq_del.file_path, object_storage, *secondary_storages, local_context); + if (del_storage != object_storage) + { + object_info->info.requires_external_storage = true; + break; + } + } + } + ProfileEvents::increment(ProfileEvents::IcebergMetadataReturnedObjectInfos); return object_info; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 95e372ccfb5c..551f3163701a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace DB { @@ -43,7 +44,8 @@ class SingleThreadIcebergKeysIterator const ActionsDAG * filter_dag_, TableStateSnapshotPtr table_snapshot_, IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components); + PersistentTableComponents persistent_components, + std::shared_ptr secondary_storages_); std::optional next(); @@ -56,6 +58,8 @@ class SingleThreadIcebergKeysIterator PersistentTableComponents persistent_components; LoggerPtr log; + std::shared_ptr secondary_storages; + size_t manifest_file_index = 0; Iceberg::ManifestIteratorPtr current_manifest_file_iterator; @@ -74,7 +78,8 @@ class IcebergIterator : public IObjectIterator IDataLakeMetadata::FileProgressCallback callback_, Iceberg::TableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - Iceberg::PersistentTableComponents persistent_components_); + Iceberg::PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_); ObjectInfoPtr next(size_t) override; @@ -85,6 +90,7 @@ class IcebergIterator : public IObjectIterator LoggerPtr logger; std::shared_ptr filter_dag; ObjectStoragePtr object_storage; + ContextPtr local_context; const Iceberg::TableStateSnapshotPtr table_state_snapshot; Iceberg::PersistentTableComponents persistent_components; Iceberg::SingleThreadIcebergKeysIterator data_files_iterator; @@ -97,6 +103,7 @@ class IcebergIterator : public IObjectIterator std::exception_ptr exception; std::mutex exception_mutex; Int32 table_schema_id; + std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 9d52ca68cc41..573aaa574e8a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -263,6 +263,7 @@ IcebergMetadata::IcebergMetadata( IcebergMetadataFilesCachePtr cache_ptr) : log(getLogger("IcebergMetadata")) , object_storage(std::move(object_storage_)) + , secondary_storages(std::make_shared()) , persistent_components(initializePersistentTableComponents(configuration_, cache_ptr, context_)) , data_lake_settings(configuration_->getDataLakeSettings()) , write_format(configuration_->getFormat()) @@ -323,7 +324,8 @@ void IcebergMetadata::backgroundMetadataPrefetcherThread() auto manifest_file_ptr = Iceberg::getManifestFile( object_storage, persistent_components, ctx, log, entry.manifest_file_path, - entry.manifest_file_byte_size); + entry.manifest_file_byte_size, + *secondary_storages); } } @@ -462,7 +464,8 @@ IcebergDataSnapshotPtr IcebergMetadata::createIcebergDataSnapshotFromSnapshotJSO local_context, getProperFilePathFromMetadataInfo( manifest_list_file_path, persistent_components.table_path, persistent_components.table_location), - log), + log, + *secondary_storages), snapshot_id, schema_id, total_rows, @@ -495,6 +498,7 @@ bool IcebergMetadata::optimize( snapshots_info, persistent_components, object_storage, + secondary_storages, data_lake_settings, format_settings, sample_block, @@ -1112,7 +1116,7 @@ bool IcebergMetadata::isDataSortedBySortingKey(StorageMetadataPtr storage_metada for (const auto & manifest_list_entry : data_snapshot->manifest_list_entries) { auto files_handle = getManifestFileEntriesHandle( - object_storage, persistent_components, context, log, manifest_list_entry, table_state_snapshot->schema_id); + object_storage, persistent_components, context, log, manifest_list_entry, table_state_snapshot->schema_id, *secondary_storages); if (!files_handle.areAllDataFilesSortedBySortOrderID(sorting_key.sort_order_id.value())) return false; @@ -1143,7 +1147,7 @@ std::optional IcebergMetadata::totalRows(ContextPtr local_context) const for (const auto & manifest_list_entry : actual_data_snapshot->manifest_list_entries) { auto manifest_file_ptr = getManifestFileEntriesHandle( - object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id); + object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id, *secondary_storages); auto data_count = manifest_file_ptr.getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA); auto position_deletes_count = manifest_file_ptr.getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE); if (!data_count.has_value() || !position_deletes_count.has_value()) @@ -1172,7 +1176,7 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons for (const auto & manifest_list_entry : actual_data_snapshot->manifest_list_entries) { auto manifest_file_ptr = getManifestFileEntriesHandle( - object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id); + object_storage, persistent_components, local_context, log, manifest_list_entry, actual_table_state_snapshot.schema_id, *secondary_storages); auto count = manifest_file_ptr.getBytesCountInAllDataFilesExcludingDeleted(); if (!count.has_value()) return {}; @@ -1241,7 +1245,8 @@ ObjectIterator IcebergMetadata::iterate( callback, iceberg_table_state, getRelevantDataSnapshotFromTableStateSnapshot(*iceberg_table_state, local_context), - persistent_components); + persistent_components, + secondary_storages); } NamesAndTypesList IcebergMetadata::getTableSchema(ContextPtr local_context) const @@ -1299,7 +1304,7 @@ void IcebergMetadata::addDeleteTransformers( LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->info.position_deletes_objects.size()); builder.addSimpleTransform( [&](const SharedHeader & header) - { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context); }); + { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, parser_shared_resources, local_context, persistent_components.table_location, secondary_storages); }); } const auto & delete_files = iceberg_object_info->info.equality_deletes_objects; if (!delete_files.empty()) @@ -1310,9 +1315,13 @@ void IcebergMetadata::addDeleteTransformers( { /// get header of delete file Block delete_file_header; - RelativePathWithMetadata delete_file_object(delete_file.file_path); + + auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( + persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context); + + RelativePathWithMetadata delete_file_object(resolved_delete_key); { - auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -1335,7 +1344,7 @@ void IcebergMetadata::addDeleteTransformers( } /// Then we read the content of the delete file. auto mutable_columns_for_set = block_for_set.cloneEmptyColumns(); - std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto"); auto delete_format = FormatFactory::instance().getInput( delete_file.file_format, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index cd54eb235035..798abdc628d2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace DB { @@ -148,6 +149,8 @@ class IcebergMetadata : public IDataLakeMetadata CompressionMethod getCompressionMethod() const { return persistent_components.metadata_compression_method; } + std::string getTableLocation() const override { return persistent_components.table_location; } + bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override; bool supportsDelete() const override { return true; } void mutate( @@ -232,6 +235,7 @@ class IcebergMetadata : public IDataLakeMetadata LoggerPtr log; const ObjectStoragePtr object_storage; + mutable std::shared_ptr secondary_storages; DB::Iceberg::PersistentTableComponents persistent_components; const DataLakeStorageSettings & data_lake_settings; const String write_format; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index c3148e2a3f18..50b4fc845192 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp index be48c77381db..ff6eff7b1c7c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h index f46ec2823509..33b6ba6bbd15 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB::Iceberg { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index 8d7820f9ea71..a3e954f219f6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -68,11 +68,11 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() for (const auto & position_deletes_object : iceberg_object_info->info.position_deletes_objects) { + auto [delete_storage_to_use, resolved_key] = resolveObjectStorageForPath( + table_location, position_deletes_object.file_path, object_storage, *secondary_storages, context); - auto object_path = position_deletes_object.file_path; - auto object_metadata = object_storage->getObjectMetadata(object_path, /*with_tags=*/ false); - auto object_info = RelativePathWithMetadata{object_path, object_metadata}; - + auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_key, /*with_tags=*/ false); + RelativePathWithMetadata object_info(resolved_key, object_metadata); String format = position_deletes_object.file_format; if (boost::to_lower_copy(format) != "parquet") @@ -80,7 +80,7 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() Block initial_header; { - std::unique_ptr read_buf_schema = createReadBuffer(object_info, object_storage, context, log); + std::unique_ptr read_buf_schema = createReadBuffer(object_info, delete_storage_to_use, context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -91,9 +91,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() initial_header = Block(initial_header_data); } - CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto"); + CompressionMethod compression_method = chooseCompressionMethod(resolved_key, "auto"); - delete_read_buffers.push_back(createReadBuffer(object_info, object_storage, context, log)); + delete_read_buffers.push_back(createReadBuffer(object_info, delete_storage_to_use, context, log)); auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList()); ExpressionAnalyzer analyzer(where_ast, syntax_result, context); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h index 0062952ee7c6..c73ed04e0015 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h @@ -28,7 +28,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) + ContextPtr context_, + const String & table_location_, + std::shared_ptr secondary_storages_) : ISimpleTransform(header_, header_, false) , header(header_) , iceberg_object_info(iceberg_object_info_) @@ -36,6 +38,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform , format_settings(format_settings_) , context(context_) , parser_shared_resources(parser_shared_resources_) + , table_location(table_location_) + , secondary_storages(std::move(secondary_storages_)) { initializeDeleteSources(); } @@ -56,6 +60,9 @@ class IcebergPositionDeleteTransform : public ISimpleTransform ContextPtr context; FormatParserSharedResourcesPtr parser_shared_resources; + const String table_location; + std::shared_ptr secondary_storages; + /// We need to keep the read buffers alive since the delete_sources depends on them. std::vector> delete_read_buffers; std::vector> delete_sources; @@ -72,8 +79,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const String & table_location_, + std::shared_ptr secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, table_location_, std::move(secondary_storages_)) { initialize(); } @@ -98,8 +107,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran ObjectStoragePtr object_storage_, const std::optional & format_settings_, FormatParserSharedResourcesPtr parser_shared_resources_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_) + ContextPtr context_, + const String & table_location_, + std::shared_ptr secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, parser_shared_resources_, context_, table_location_, std::move(secondary_storages_)) { initialize(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp index c55e215cc95d..efdd08f89be1 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp @@ -70,7 +70,8 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( ContextPtr local_context, LoggerPtr log, const String & filename, - size_t bytes_size) + size_t bytes_size, + SecondaryStorages & secondary_storages) { auto log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -79,15 +80,18 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata manifest_object_info(filename); + auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, filename, object_storage, secondary_storages, local_context); + + RelativePathWithMetadata manifest_object_info(resolved_key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings); - auto manifest_file_deserializer = std::make_unique( + auto buffer = createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings); + auto manifest_file_deserializer = std::make_shared( std::move(buffer), filename, getFormatSettings(local_context)); return Iceberg::ManifestFileCacheableInfo{std::move(manifest_file_deserializer), bytes_size}; @@ -108,7 +112,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH ContextPtr local_context, LoggerPtr log, const ManifestFileCacheKey & cache_key, - Int32 table_snapshot_schema_id) + Int32 table_snapshot_schema_id, + SecondaryStorages & secondary_storages) { auto cacheable_info = getManifestFile( object_storage, @@ -116,7 +121,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH local_context, log, cache_key.manifest_file_path, - static_cast(cache_key.manifest_file_byte_size)); + cache_key.manifest_file_byte_size, + secondary_storages); auto iterator = Iceberg::ManifestFileIterator::create( cacheable_info.deserializer, @@ -144,7 +150,8 @@ ManifestFileCacheKeys getManifestList( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, const String & filename, - LoggerPtr log) + LoggerPtr log, + SecondaryStorages & secondary_storages) { IcebergMetadataLogLevel log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -153,14 +160,17 @@ ManifestFileCacheKeys getManifestList( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata object_info(filename); + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, filename, object_storage, secondary_storages, local_context); + + RelativePathWithMetadata object_info(key_in_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto manifest_list_buf = createReadBuffer(object_info, object_storage, local_context, log, read_settings); + auto manifest_list_buf = createReadBuffer(object_info, storage_to_use, local_context, log, read_settings); AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context)); ManifestFileCacheKeys manifest_file_cache_keys; @@ -209,7 +219,7 @@ ManifestFileCacheKeys getManifestList( manifest_list_deserializer.getValueFromRowByName(i, f_content, TypeIndex::Int32).safeGet()); } manifest_file_cache_keys.emplace_back( - manifest_file_name, manifest_length, added_sequence_number, added_snapshot_id.safeGet(), content_type); + manifest_file_name, static_cast(manifest_length), added_sequence_number, added_snapshot_id.safeGet(), content_type); auto dump_row_metadata = [&]()->String { return manifest_list_deserializer.getContent(i); }; insertRowToLogTable( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h index ac5a7f7bc06e..9fa67560e9df 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h @@ -19,6 +19,7 @@ #include #include +#include namespace DB::Iceberg { @@ -29,7 +30,8 @@ Iceberg::ManifestFileCacheableInfo getManifestFile( ContextPtr local_context, LoggerPtr log, const String & filename, - size_t bytes_size); + size_t bytes_size, + SecondaryStorages & secondary_storages); /// Creates a fully initialized ManifestFileIterator from a cache key. /// All entries are drained so that aggregate methods (e.g. getRowsCountInAllFilesExcludingDeleted) @@ -40,7 +42,8 @@ Iceberg::ManifestFileIterator::ManifestFileEntriesHandle getManifestFileEntriesH ContextPtr local_context, LoggerPtr log, const ManifestFileCacheKey & cache_key, - Int32 table_snapshot_schema_id); + Int32 table_snapshot_schema_id, + SecondaryStorages & secondary_storages); ManifestFileCacheKeys getManifestList( @@ -48,7 +51,8 @@ ManifestFileCacheKeys getManifestList( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, const String & filename, - LoggerPtr log); + LoggerPtr log, + SecondaryStorages & secondary_storages); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 923a97a222e4..b78ca0fd0560 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -37,12 +37,13 @@ #include #include #include +#include #if USE_AVRO #include -#include #include +#include #include #include #include @@ -99,7 +100,6 @@ static constexpr auto MAX_TRANSACTION_RETRIES = 100; namespace DB::Iceberg { - using namespace DB; static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) { @@ -1529,3 +1529,29 @@ void sortBlockByKeyDescription(Block & block, const KeyDescription & sort_descri } #endif + +namespace DB +{ + +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + { + if (auto resolved = iceberg_info->getResolvedStorage()) + return resolved; + } +#endif + return default_storage; +} + +std::optional getMetadataPathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info) +{ +#if USE_AVRO + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + return iceberg_info->getMetadataPath(); +#endif + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 7695ca7edeff..2b11f1acdbd9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -1,5 +1,7 @@ #pragma once +#include +#include "config.h" #include #include #include @@ -12,9 +14,20 @@ #include #include +#include + +namespace DB +{ +struct ObjectInfo; +using ObjectInfoPtr = std::shared_ptr; + +/// These functions are always available; they return fallback values when USE_AVRO is not defined +ObjectStoragePtr getResolvedStorageFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info, const ObjectStoragePtr & default_storage); +std::optional getMetadataPathFromObjectInfo([[maybe_unused]] const ObjectInfoPtr & object_info); +} + #if USE_AVRO -#include #include #include #include @@ -49,8 +62,7 @@ bool writeMetadataFileAndVersionHint( DB::ObjectStoragePtr object_storage, DB::ContextPtr context, DB::CompressionMethod compression_method, - bool try_write_version_hint -); + bool try_write_version_hint); std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index ee0cf191045c..5b696970a40f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -202,11 +203,17 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( { const bool expect_whole_archive = !local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; + /// Use the full table location URI (e.g. `s3a://bucket/prefix/table/`) when available + std::string table_location = configuration->getPathForRead().path; + if (auto * metadata = configuration->getExternalMetadata()) + table_location = metadata->getTableLocation(); + auto distributed_iterator = std::make_unique( local_context->getClusterFunctionReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads], /*is_archive_=*/is_archive && !expect_whole_archive, object_storage, + table_location, local_context); if (is_archive && expect_whole_archive) @@ -451,11 +458,13 @@ Chunk StorageObjectStorageSource::generate() path); } + std::string path_for_virtual_column = getMetadataPathFromObjectInfo(object_info).value_or(path); + VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( chunk, read_from_format_info.requested_virtual_columns, { - .path = path, + .path = path_for_virtual_column, .size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_metadata->size_bytes, .filename = &filename, .last_modified = object_metadata->last_modified, @@ -714,16 +723,18 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); const auto & path = object_info->isArchive() ? object_info->getPathToArchive() : object_info->getPath(); + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(object_info, object_storage); + if (query_settings.ignore_non_existent_file) { - auto metadata = object_storage->tryGetObjectMetadata(path, with_tags); + auto metadata = storage_to_use->tryGetObjectMetadata(path, with_tags); if (!metadata) return {}; object_info->setObjectMetadata(metadata.value()); } else - object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); + object_info->setObjectMetadata(storage_to_use->getObjectMetadata(path, with_tags)); } if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 @@ -970,7 +981,11 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else { compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); - read_buf = createReadBuffer(object_info->relative_path_with_metadata, object_storage, context_, log); + read_buf = createReadBuffer( + object_info->relative_path_with_metadata, + getResolvedStorageFromObjectInfo(object_info, object_storage), + context_, + log); } Block initial_header = read_from_format_info.format_header; @@ -1631,11 +1646,13 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_) : WithContext(context_) , callback(callback_) , is_archive(is_archive_) , object_storage(object_storage_) + , table_location(table_location_) { if (!getContext()->isSwarmModeEnabled()) { @@ -1667,7 +1684,32 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( { auto object = object_future.get(); if (object) + { +#if USE_AVRO + /// For Iceberg objects, resolve the storage from the raw metadata path + if (auto iceberg_info = std::dynamic_pointer_cast(object)) + { + if (!iceberg_info->getResolvedStorage()) + { + if (auto metadata_path = iceberg_info->getMetadataPath()) + { + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, *metadata_path, object_storage, secondary_storages, getContext()); + if (!key.empty()) + { + iceberg_info->setResolvedStorage(storage_to_use); + /// For base storage, keep the key already resolved by the coordinator + /// so we don't re-derive a different `table_location` -> `table_root` + /// translation here. + if (storage_to_use != object_storage) + iceberg_info->relative_path_with_metadata.relative_path = key; + } + } + } + } +#endif buffer.push_back(object); + } } } @@ -1684,10 +1726,35 @@ ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t) return nullptr; } - auto task = callback(); - if (!task || task->isEmpty()) + auto raw = callback(); + if (!raw || raw->isEmpty()) return nullptr; - object_info = task->getObjectInfo(); + + object_info = raw->getObjectInfo(); + +#if USE_AVRO + /// For Iceberg objects, resolve the storage from the raw metadata path + if (auto iceberg_info = std::dynamic_pointer_cast(object_info)) + { + if (!iceberg_info->getResolvedStorage()) + { + if (auto metadata_path = iceberg_info->getMetadataPath()) + { + auto [storage_to_use, key] = resolveObjectStorageForPath( + table_location, *metadata_path, object_storage, secondary_storages, getContext()); + if (!key.empty()) + { + iceberg_info->setResolvedStorage(storage_to_use); + /// For base storage, keep the key already resolved by the coordinator + /// so we don't re-derive a different `table_location` -> `table_root` + /// translation here. + if (storage_to_use != object_storage) + iceberg_info->relative_path_with_metadata.relative_path = key; + } + } + } + } +#endif } else { @@ -1782,7 +1849,10 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o /* path_to_archive */ object_info->getPath(), /* archive_read_function */ [=, this]() - { return createReadBuffer(object_info->relative_path_with_metadata, object_storage, getContext(), log); }, + { + auto storage = getResolvedStorageFromObjectInfo(object_info, object_storage); + return createReadBuffer(object_info->relative_path_with_metadata, storage, getContext(), log); + }, /* archive_size */ size); } @@ -1804,7 +1874,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor } if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); file_enumerator = archive_reader->firstFile(); @@ -1830,7 +1903,10 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->getObjectMetadata()) - archive_object->setObjectMetadata(object_storage->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + { + ObjectStoragePtr storage_to_use = getResolvedStorageFromObjectInfo(archive_object, object_storage); + archive_object->setObjectMetadata(storage_to_use->getObjectMetadata(archive_object->getPath(), /*with_tags=*/ false)); + } archive_reader = createArchiveReader(archive_object); if (!archive_reader->fileExists(path_in_archive)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 6326fa6ee89f..5d7e27f2819e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,7 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri size_t max_threads_count, bool is_archive_, ObjectStoragePtr object_storage_, + const std::string & table_location_, ContextPtr context_); ObjectInfoPtr next(size_t) override; @@ -184,6 +186,10 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri std::atomic_size_t index = 0; bool is_archive; ObjectStoragePtr object_storage; + std::string table_location; +#if USE_AVRO + SecondaryStorages secondary_storages; /// For Iceberg: cache of storages for external file locations +#endif /// path_to_archive -> archive reader. std::unordered_map> archive_readers; std::mutex archive_readers_mutex; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 06af26c0ba98..c46bc5b45edc 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -275,6 +277,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s ++it; } + LOG_TRACE( log, "No unprocessed file for replica {}, need to retry after {} us", @@ -342,6 +345,11 @@ String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPt } return file_identifier; } + /// For Iceberg data files, the same data file can be referenced by different keys depending + /// on which storage actually holds it. Use the metadata path (as written in the manifest) as + /// the stable identifier so the same file consistently maps to the same replica. + if (auto metadata_path = getMetadataPathFromObjectInfo(file_object); metadata_path.has_value()) + return *metadata_path; return file_object->getIdentifier(); } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 542329f76944..0f912ae779fc 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -14,6 +14,23 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#if USE_AWS_S3 +#include +#endif +#if USE_AZURE_BLOB_STORAGE +#include +#endif +#if USE_HDFS +#include +#endif + namespace DB { @@ -23,6 +40,203 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int PATH_ACCESS_DENIED; +} + +namespace +{ + +#if USE_AVRO +std::string normalizeScheme(const std::string & scheme) +{ + auto scheme_lowercase = Poco::toLower(scheme); + + if (scheme_lowercase == "s3a" || scheme_lowercase == "s3n" || scheme_lowercase == "gs" || scheme_lowercase == "gcs" || scheme_lowercase == "oss") + scheme_lowercase = "s3"; + else if (scheme_lowercase == "wasb" || scheme_lowercase == "wasbs" || scheme_lowercase == "abfss") + scheme_lowercase = "abfs"; + + return scheme_lowercase; +} + +std::string factoryTypeForScheme(const std::string & normalized_scheme) +{ + if (normalized_scheme == "s3") return "s3"; + if (normalized_scheme == "abfs") return "azure"; + if (normalized_scheme == "hdfs") return "hdfs"; + if (normalized_scheme == "file") return "local"; + return ""; +} + +#if USE_AWS_S3 +/// For s3:// URIs (generic), bucket needs to match. +/// For explicit http(s):// URIs, both bucket and endpoint must match. +bool s3URIMatches(const S3::URI & target_uri, const std::string & base_bucket, const std::string & base_endpoint, const std::string & target_scheme_normalized) +{ + bool bucket_matches = (target_uri.bucket == base_bucket); + bool endpoint_matches = (target_uri.endpoint == base_endpoint); + bool is_generic_s3_uri = (target_scheme_normalized == "s3"); + return bucket_matches && (endpoint_matches || is_generic_s3_uri); +} + +bool sameEndpoint(const std::string & a, const std::string & b) +{ + SchemeAuthorityKey pa(a); + SchemeAuthorityKey pb(b); + if (pa.authority.empty() || pb.authority.empty()) + return false; + return pa.scheme == pb.scheme && pa.authority == pb.authority; +} +#endif +std::pair getOrCreateStorageAndKey( + const std::string & cache_key, + const std::string & key_to_use, + const std::string & storage_type, + SecondaryStorages & secondary_storages, + const ContextPtr & context, + std::function configure_fn) +{ + std::lock_guard lock(secondary_storages.mutex); + if (auto it = secondary_storages.storages.find(cache_key); it != secondary_storages.storages.end()) + return {it->second, key_to_use}; + + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", storage_type); + + configure_fn(*cfg, config_prefix); + + /// Create under lock to avoid duplicate creation and wasted work + ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + secondary_storages.storages.emplace(cache_key, storage); + return {storage, key_to_use}; +} +#endif + +bool isAbsolutePath(const std::string & path) +{ + if (!path.empty() && (path.front() == '/' || path.find("://") != std::string_view::npos)) + return true; + + return false; +} + +/// Normalize a path string by removing redundant components and leading slashes. +std::string normalizePathString(const std::string & path) +{ + std::filesystem::path fs_path(path); + std::filesystem::path normalized = fs_path.lexically_normal(); + + std::string normalized_result = normalized.string(); + + while (!normalized_result.empty() && normalized_result.front() == '/') + normalized_result = normalized_result.substr(1); + + return normalized_result; +} + +/// Convert a path (relative to table location or absolute path) to a key that will be looked up in the object storage. +/// +/// - If `table_location` is empty, the path is treated as already relative to storage root. +/// - If `path` is an absolute path, its key component (without scheme/authority) is returned. +/// - If `table_location` parses to a URI whose key part is empty, `path` is returned unchanged (exception will be thrown when looking up non-existing object in storage) +/// +/// - Otherwise, `path` is treated as relative to `table_location`'s key: +/// leading '/' stripped, concatenated to table_location key, and the result is normalized. +std::string convertPathToKeyInStorage(const std::string & table_location, const std::string & path) +{ + if (table_location.empty()) + { + if (!path.empty() && path.front() == '/') + return path.substr(1); + return path; + } + + if (isAbsolutePath(path)) + return SchemeAuthorityKey(path).key; // Absolute path, return the key part + + SchemeAuthorityKey base{table_location}; + if (base.key.empty()) + return path; // Table location is empty, return the path as is + + std::string base_key_trimmed = base.key; + while (!base_key_trimmed.empty() && base_key_trimmed.front() == '/') + base_key_trimmed = base_key_trimmed.substr(1); + while (!base_key_trimmed.empty() && base_key_trimmed.back() == '/') + base_key_trimmed.pop_back(); + + std::string rel_path = path; + while (!rel_path.empty() && rel_path.front() == '/') + rel_path = rel_path.substr(1); + + auto reattach_slash = [&](std::string s) -> std::string + { + if (base.scheme == "file" && !s.empty() && s.front() != '/') + return "/" + s; + return s; + }; + + if (!base_key_trimmed.empty() && (rel_path == base_key_trimmed || rel_path.starts_with(base_key_trimmed + "/"))) + return reattach_slash(normalizePathString(rel_path)); // Path already includes table location + + std::string result = base.key; + if (!result.empty() && result.back() != '/') + result += '/'; + result += rel_path; + + return reattach_slash(normalizePathString(result)); +} + +} + +SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri) +{ + if (uri.empty()) + return; + + if (auto scheme_sep = uri.find("://"); scheme_sep != std::string_view::npos) + { + scheme = Poco::toLower(uri.substr(0, scheme_sep)); + auto rest = uri.substr(scheme_sep + 3); // skip :// + + // authority is up to next '/' + auto slash = rest.find('/'); + if (slash == std::string_view::npos) + { + /// Bad URI: missing path component after authority. + /// Exception will be thrown when looking up non-existing object in the storage, so we can just return here. + authority = std::string(rest); + key = "/"; + return; + } + authority = std::string(rest.substr(0, slash)); + /// For file:// URIs, the path is absolute, so we need to keep the leading '/' + /// e.g. file:///home/user/data -> scheme="file", authority="", key="/home/user/data" + if (scheme == "file") + key = std::string(rest.substr(slash)); + else + key = std::string(rest.substr(++slash)); + return; + } + + /// Check for scheme:/path (common for file: https://datatracker.ietf.org/doc/html/rfc8089#appendix-B) + if (auto colon = uri.find(':'); colon != std::string_view::npos && colon > 0) + { + auto after_colon = uri.substr(colon + 1); + + if (!after_colon.empty() && after_colon[0] == '/') + { + scheme = Poco::toLower(uri.substr(0, colon)); + authority = ""; // No authority + key = std::string(after_colon); + return; + } + } + + // Relative path (paths starting with '/' without a scheme are now handled by the caller) + key = std::string(uri); } std::optional checkAndGetNewFileOnInsertIfNeeded( @@ -256,5 +470,376 @@ extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool use_cache_for_count_from_files; extern const SettingsString filesystem_cache_name; extern const SettingsUInt64 filesystem_cache_boundary_alignment; +extern const SettingsBool s3_propagate_credentials_to_other_storages; +} + +std::string makeAbsolutePath(const std::string & table_location, const std::string & path) +{ + if (isAbsolutePath(path)) + return path; + + auto table_location_decomposed = SchemeAuthorityKey(table_location); + + std::string normalized_key = convertPathToKeyInStorage(table_location, path); + + if (!table_location_decomposed.scheme.empty()) + { + const std::string sep = (!normalized_key.empty() && normalized_key.front() == '/') ? "" : "/"; + return table_location_decomposed.scheme + "://" + table_location_decomposed.authority + sep + normalized_key; + } + + return normalized_key; +} + +#if USE_AVRO +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context) +{ + if (!isAbsolutePath(path)) + return {base_storage, convertPathToKeyInStorage(table_location, path)}; // Relative path definitely goes to base storage + + auto ensure_local_path_inside_user_files = [&](const std::string & local_path) + { + const auto target_path = std::filesystem::path(local_path).lexically_normal(); + const auto user_files_path = std::filesystem::path(context->getUserFilesPath()).lexically_normal(); + + if (user_files_path.empty() || !fileOrSymlinkPathStartsWith(target_path.string(), user_files_path.string())) + throw DB::Exception( + DB::ErrorCodes::PATH_ACCESS_DENIED, + "File URI '{}' is outside of allowed `user_files` path '{}'", + local_path, + user_files_path.string()); + }; + + SchemeAuthorityKey table_location_decomposed{table_location}; + SchemeAuthorityKey target_decomposed{path}; + + if (target_decomposed.scheme.empty() && target_decomposed.key.starts_with('/')) + { + if (base_storage->getType() == ObjectStorageType::Local) + ensure_local_path_inside_user_files(target_decomposed.key); + + return {base_storage, convertPathToKeyInStorage(table_location, target_decomposed.key)}; + } + + const std::string base_scheme_normalized = normalizeScheme(table_location_decomposed.scheme); + const std::string target_scheme_normalized = normalizeScheme(target_decomposed.scheme); + + /// `file://` paths must stay inside `user_files`. + /// Without this check, metadata could drive reads from arbitrary local paths. + if (target_scheme_normalized == "file") + { + ensure_local_path_inside_user_files(target_decomposed.key); + } + + // For S3 URIs, use S3::URI to properly handle all kinds of URIs, e.g. https://s3.amazonaws.com/bucket/... == s3://bucket/... + #if USE_AWS_S3 + if (target_scheme_normalized == "s3" || target_scheme_normalized == "https" || target_scheme_normalized == "http") + { + std::string normalized_path = path; + if (target_decomposed.scheme == "s3a" || target_decomposed.scheme == "s3n" || target_decomposed.scheme == "oss") + { + normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; + } + else if (target_decomposed.scheme == "gcs") + { + normalized_path = "gs://" + target_decomposed.authority + "/" + target_decomposed.key; + } + /// Paths from metadata already have correct encoding; disable Poco::URI + /// percent-decoding so that keys like `col=12%3A00%3A00` are preserved as-is. + S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, + /*keep_presigned_query_parameters*/ true, /*uri_style*/ S3UriStyle::AUTO, + /*enable_url_encoding*/ false); + + std::string key_to_use = s3_uri.key; + + bool use_base_storage = false; + if (base_storage->getType() == ObjectStorageType::S3) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_bucket = s3_storage->getObjectsNamespace(); + const std::string base_endpoint = s3_storage->getDescription(); + + if (s3URIMatches(s3_uri, base_bucket, base_endpoint, target_scheme_normalized)) + use_base_storage = true; + } + } + + if (!use_base_storage && (base_scheme_normalized == "s3" || base_scheme_normalized == "https" || base_scheme_normalized == "http")) + { + std::string normalized_table_location = table_location; + if (table_location_decomposed.scheme == "s3a" || table_location_decomposed.scheme == "s3n" || table_location_decomposed.scheme == "oss") + { + normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + else if (table_location_decomposed.scheme == "gcs") + { + normalized_table_location = "gs://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, + /*keep_presigned_query_parameters*/ true, /*uri_style*/ S3UriStyle::AUTO, + /*enable_url_encoding*/ false); + + if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) + use_base_storage = true; + } + + if (use_base_storage) + return {base_storage, key_to_use}; + + /// Include credential-propagation flag in the cache key: `configure_fn` runs only on miss, + /// so different per-query values of `s3_propagate_credentials_to_other_storages` must not share an entry. + const bool propagate_creds = context->getSettingsRef()[Setting::s3_propagate_credentials_to_other_storages]; + const std::string storage_cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint) + + "#propagate=" + (propagate_creds ? "1" : "0"); + + return getOrCreateStorageAndKey( + storage_cache_key, + key_to_use, + "s3", + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + bool endpoint_explicit = (target_decomposed.scheme == "http" || target_decomposed.scheme == "https"); + + std::string endpoint_to_use; + + if (endpoint_explicit) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + else + { + std::string base_endpoint; + if (base_storage->getType() == ObjectStorageType::S3) + base_endpoint = base_storage->getDescription(); + + if (!base_endpoint.empty()) + { + if (base_endpoint.find(".s3.") != std::string::npos && base_endpoint.find(".amazonaws.com") != std::string::npos) + { + /// AWS-style: https://oldbucket.s3.us-east-1.amazonaws.com -> https://newbucket.s3.us-east-1.amazonaws.com + size_t s3_pos = base_endpoint.find(".s3."); + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + std::string scheme = base_endpoint.substr(0, scheme_end + 3); + std::string suffix = base_endpoint.substr(s3_pos); + + /// Trim path after endpoint + size_t slash_pos = suffix.find('/', 1); + if (slash_pos != std::string::npos) + suffix = suffix.substr(0, slash_pos); + endpoint_to_use = scheme + s3_uri.bucket + suffix; + } + } + else + { + /// Path-style (e.g. minio): http://host:port/oldbucket -> http://host:port/newbucket + size_t scheme_end = base_endpoint.find("://"); + if (scheme_end != std::string::npos) + { + size_t path_start = base_endpoint.find('/', scheme_end + 3); + if (path_start != std::string::npos) + base_endpoint = base_endpoint.substr(0, path_start); + } + if (!base_endpoint.empty() && base_endpoint.back() == '/') + base_endpoint.pop_back(); + endpoint_to_use = base_endpoint + "/" + s3_uri.bucket; + } + } + + /// Fallback: base storage is not S3 + if (endpoint_to_use.empty()) + { + endpoint_to_use = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + } + } + + cfg.setString(config_prefix + ".endpoint", endpoint_to_use); + + /// Copy credentials from base storage when the endpoint is the same or + /// `s3_propagate_credentials_to_other_storages` is enabled. + if (base_storage->getType() == ObjectStorageType::S3 + && (context->getSettingsRef()[Setting::s3_propagate_credentials_to_other_storages] + || sameEndpoint(base_storage->getDescription(), endpoint_to_use))) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + if (auto s3_client = s3_storage->tryGetS3StorageClient()) + { + const auto credentials = s3_client->getCredentials(); + const String & access_key_id = credentials.GetAWSAccessKeyId(); + const String & secret_access_key = credentials.GetAWSSecretKey(); + const String & session_token = credentials.GetSessionToken(); + const String & region = s3_client->getRegion(); + + if (!access_key_id.empty()) + cfg.setString(config_prefix + ".access_key_id", access_key_id); + if (!secret_access_key.empty()) + cfg.setString(config_prefix + ".secret_access_key", secret_access_key); + if (!session_token.empty()) + cfg.setString(config_prefix + ".session_token", session_token); + if (!region.empty()) + cfg.setString(config_prefix + ".region", region); + } + } + } + }); + } + #endif + + #if USE_HDFS + if (target_scheme_normalized == "hdfs") + { + bool use_base_storage = false; + + // Check if base_storage matches (only if it's HDFS) + if (base_storage->getType() == ObjectStorageType::HDFS) + { + if (auto hdfs_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_url = hdfs_storage->getDescription(); + // Extract endpoint from base URL (hdfs://namenode:port/path -> hdfs://namenode:port) + std::string base_endpoint; + if (auto pos = base_url.find('/', base_url.find("//") + 2); pos != std::string::npos) + base_endpoint = base_url.substr(0, pos); + else + base_endpoint = base_url; + + // For HDFS, compare endpoints (namenode addresses) + std::string target_endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + + if (base_endpoint == target_endpoint) + use_base_storage = true; + + // Also check if table_location matches + if (!use_base_storage && base_scheme_normalized == "hdfs") + { + if (table_location_decomposed.authority == target_decomposed.authority) + use_base_storage = true; + } + } + } + + if (use_base_storage) + return {base_storage, target_decomposed.key}; + } + #endif + + /// Fallback for schemes not handled above (e.g., abfs, file) + if (base_scheme_normalized == target_scheme_normalized && table_location_decomposed.authority == target_decomposed.authority) + return {base_storage, target_decomposed.key}; + + const std::string type_for_factory = factoryTypeForScheme(target_scheme_normalized); + if (type_for_factory.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported storage scheme '{}' in path '{}'", target_scheme_normalized, path); + + /// For `file://` URIs the authority is always empty, so using just `"file://"` as the + /// cache key would cause every directory to share a single `LocalObjectStorage` instance + /// whose root (`key_prefix`) is set to the parent directory of the first file ever seen. + /// To avoid this, include the parent directory of the target file in the cache key so that + /// each directory gets its own storage instance with the correct root. + std::string file_dir_path; // only set for file:// URIs + std::string cache_key; + if (target_scheme_normalized == "file") + { + std::filesystem::path fs_path(target_decomposed.key); + file_dir_path = fs_path.parent_path().string(); + if (file_dir_path.empty() || file_dir_path == "/") + file_dir_path = "/"; + else if (file_dir_path.back() != '/') + file_dir_path += '/'; + cache_key = "file://" + file_dir_path; + } + else + { + cache_key = target_scheme_normalized + "://" + target_decomposed.authority; + } + + /// Handle storage types that need new storage creation + return getOrCreateStorageAndKey( + cache_key, + target_decomposed.key, + type_for_factory, + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + if (target_scheme_normalized == "file") + { + cfg.setString(config_prefix + ".path", file_dir_path); + } + else if (target_scheme_normalized == "abfs") + { + std::string container_name; + std::string account_name; + const auto & authority = target_decomposed.authority; + + auto at_pos = authority.find('@'); + if (at_pos != std::string::npos) + { + container_name = authority.substr(0, at_pos); + account_name = authority.substr(at_pos + 1); + /// Remove .dfs.core.windows.net suffix if present + auto suffix_pos = account_name.find('.'); + if (suffix_pos != std::string::npos) + account_name = account_name.substr(0, suffix_pos); + } + else + container_name = authority; + + cfg.setString(config_prefix + ".container_name", container_name); + if (!account_name.empty()) + cfg.setString(config_prefix + ".account_name", account_name); + +#if USE_AZURE_BLOB_STORAGE + /// Copy credentials from base Azure storage if available + if (base_storage->getType() == ObjectStorageType::Azure) + { + if (auto azure_storage = std::dynamic_pointer_cast(base_storage)) + { + const auto & conn_params = azure_storage->getConnectionParameters(); + const auto & auth_method = azure_storage->getAzureBlobStorageAuthMethod(); + + if (std::holds_alternative(auth_method)) + { + cfg.setString(config_prefix + ".connection_string", + std::get(auth_method).toUnderType()); + } + else + { + const auto & endpoint = conn_params.endpoint; + if (!endpoint.storage_account_url.empty()) + cfg.setString(config_prefix + ".storage_account_url", endpoint.storage_account_url); + if (account_name.empty() && !endpoint.account_name.empty()) + cfg.setString(config_prefix + ".account_name", endpoint.account_name); + } + } + } +#endif + } + else if (target_scheme_normalized == "hdfs") + { + // HDFS endpoint must end with '/' + auto endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + if (!endpoint.empty() && endpoint.back() != '/') + endpoint.push_back('/'); + cfg.setString(config_prefix + ".endpoint", endpoint); + } + }); } + +#endif + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 5cc48a5d581d..b1278881ea62 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -2,11 +2,38 @@ #include #include +#include +#include +#include + namespace DB { class IObjectStorage; +#if USE_AVRO +/// Thread-safe wrapper for secondary object storages map +/// (now only used for Iceberg) +struct SecondaryStorages +{ + mutable std::mutex mutex; + std::map storages; +}; +#endif + +// A URI split into components +// s3://bucket/a/b -> scheme="s3", authority="bucket", path="/a/b" +// file:///var/x -> scheme="file", authority="", path="/var/x" +// /abs/p -> scheme="", authority="", path="/abs/p" +struct SchemeAuthorityKey +{ + explicit SchemeAuthorityKey(const std::string & uri); + + std::string scheme; + std::string authority; + std::string key; +}; + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -62,5 +89,18 @@ struct ParseFromDiskResult ParseFromDiskResult parseFromDisk(ASTs args, bool with_structure, ContextPtr context, const fs::path & prefix); +std::string makeAbsolutePath(const std::string & table_location, const std::string & path); + +#if USE_AVRO +/// Resolve object storage and key for reading from that storage +/// If path is relative -- it must be read from base_storage +/// Otherwise, look for a suitable storage in secondary_storages +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context); +#endif } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 81148b1514cb..9f51fdb2efea 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -812,10 +812,10 @@ std::function IStorageURLBase::getReadPOSTDataCallback( namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext + class StorageURLReadBufferIterator : public IReadBufferIterator, WithContext { public: - ReadBufferIterator( + StorageURLReadBufferIterator( const std::vector & urls_to_check_, std::optional format_, const CompressionMethod & compression_method_, @@ -1045,7 +1045,7 @@ std::pair IStorageURLBase::getTableStructureAndForma else urls_to_check = {uri}; - ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + StorageURLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); if (format) return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); diff --git a/tests/integration/test_storage_iceberg_multistorage/__init__.py b/tests/integration/test_storage_iceberg_multistorage/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml new file mode 100644 index 000000000000..54c08b27abe8 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml new file mode 100644 index 000000000000..516e4ba63a3a --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/named_collections.xml @@ -0,0 +1,15 @@ + + + + http://minio1:9001/root/ + minio + ClickHouse_Minio_P@ssw0rd + + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml new file mode 100644 index 000000000000..a63e91f41fbc --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/configs/users.d/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_storage_iceberg_multistorage/test.py b/tests/integration/test_storage_iceberg_multistorage/test.py new file mode 100644 index 000000000000..5b477055b2ab --- /dev/null +++ b/tests/integration/test_storage_iceberg_multistorage/test.py @@ -0,0 +1,426 @@ +import pytest +import pyspark +import os +import shutil +import tempfile +import json +import avro.datafile +import avro.io + +from helpers.cluster import ClickHouseCluster +from helpers.s3_tools import ( + LocalUploader, + S3Uploader, + AzureUploader, + LocalDownloader, + S3Downloader, + prepare_s3_bucket, +) +from helpers.iceberg_utils import ( + get_uuid_str, + default_upload_directory, + default_download_directory, +) + +def get_spark(): + builder = ( + pyspark.sql.SparkSession.builder.appName("test_storage_iceberg_multistorage") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.iceberg.spark.SparkSessionCatalog", + ) + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", "/var/lib/clickhouse/user_files/iceberg_data") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + ) + .master("local") + ) + return builder.getOrCreate() + + +@pytest.fixture(scope="package") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__, with_spark=True) + cluster.add_instance( + "node1", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + ], + user_configs=["configs/users.d/users.xml"], + with_minio=True, + with_azurite=True, + stay_alive=True, + ) + + cluster.start() + + prepare_s3_bucket(cluster) + + cluster.spark_session = get_spark() + + cluster.default_s3_uploader = S3Uploader(cluster.minio_client, cluster.minio_bucket) + cluster.default_s3_downloader = S3Downloader(cluster.minio_client, cluster.minio_bucket) + + cluster.azure_container_name = "mycontainer" + cluster.blob_service_client.create_container(cluster.azure_container_name) + cluster.default_azure_uploader = AzureUploader(cluster.blob_service_client, cluster.azure_container_name) + + cluster.default_local_uploader = LocalUploader(cluster.instances["node1"]) + cluster.default_local_downloader = LocalDownloader(cluster.instances["node1"]) + + # Create extra S3 buckets for test_four_different_locations + for i in range(1, 4): + bucket_name = f"{cluster.minio_bucket}-storage{i}" + if not cluster.minio_client.bucket_exists(bucket_name): + cluster.minio_client.make_bucket(bucket_name) + + yield cluster + + finally: + cluster.shutdown() + + +def modify_avro_file(avro_path: str, field_path: list, modifier_func) -> None: + """ + Modify a field in an AVRO file, preserving the rest of it as is. + + field_path: list of keys to navigate to the field + modifier_func: function that takes old value and returns new value + """ + with open(avro_path, 'rb') as f: + reader = avro.datafile.DataFileReader(f, avro.io.DatumReader()) + schema = reader.datum_reader.writers_schema + # Preserve all file metadata (partition-spec, format-version, etc.) + metadata = dict(reader.meta) + records = list(reader) + reader.close() + + for record in records: + obj = record + for key in field_path[:-1]: + if obj is None or key not in obj: + break + obj = obj[key] + else: + if obj and field_path[-1] in obj: + obj[field_path[-1]] = modifier_func(obj[field_path[-1]]) + + with open(avro_path, 'wb') as f: + writer = avro.datafile.DataFileWriter(f, avro.io.DatumWriter(), schema) + for key, value in metadata.items(): + if not key.startswith('avro.'): + writer.set_meta(key, value) + for record in records: + writer.append(record) + writer.close() + + +def get_absolute_path(storage_type: str, cluster, relative_path: str) -> str: + """Convert relative path to absolute path for given storage type.""" + relative_path = relative_path.lstrip("/") + + if storage_type == "s3": + return f"s3a://{cluster.minio_bucket}/{relative_path}" + elif storage_type.startswith("s3:"): # s3:bucket_name format + bucket = storage_type.split(":")[1] + return f"s3a://{bucket}/{relative_path}" + elif storage_type == "azure": + return f"abfs://{cluster.azure_container_name}@{cluster.azurite_account}/{relative_path}" + elif storage_type.startswith("azure:"): # azure:container_name format + container = storage_type.split(":")[1] + return f"abfs://{container}@{cluster.azurite_account}/{relative_path}" + elif storage_type == "local": + return f"file:///{relative_path}" + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_uploader(storage_type: str, cluster): + if storage_type == "s3": + return cluster.default_s3_uploader + elif storage_type.startswith("s3:"): + bucket = storage_type.split(":")[1] + return S3Uploader(cluster.minio_client, bucket) + elif storage_type == "azure": + return cluster.default_azure_uploader + elif storage_type.startswith("azure:"): + container = storage_type.split(":")[1] + return AzureUploader(cluster.blob_service_client, container) + elif storage_type == "local": + return cluster.default_local_uploader + else: + raise ValueError(f"Unknown storage type: {storage_type}") + + +def get_table_function(metadata_storage: str): + if metadata_storage == "s3" or metadata_storage.startswith("s3:"): + return "icebergS3" + elif metadata_storage == "azure" or metadata_storage.startswith("azure:"): + return "icebergAzure" + elif metadata_storage == "local": + return "icebergLocal" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def get_query_args(metadata_storage: str, cluster, table_path: str): + """Get query arguments for the iceberg table function.""" + minio_url = f"http://{cluster.minio_host}:{cluster.minio_port}" + if metadata_storage == "s3": + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{cluster.minio_bucket}/'" + elif metadata_storage.startswith("s3:"): + bucket = metadata_storage.split(":")[1] + return f"s3, filename='{table_path}/', format=Parquet, url='{minio_url}/{bucket}/'" + elif metadata_storage == "azure": + return f"azure, container='{cluster.azure_container_name}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage.startswith("azure:"): + container = metadata_storage.split(":")[1] + return f"azure, container='{container}', storage_account_url='{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', blob_path='{table_path}/', format=Parquet" + elif metadata_storage == "local": + return f"local, path='/{table_path}', format=Parquet" + else: + raise ValueError(f"Unknown storage type: {metadata_storage}") + + +def find_files(directory: str, suffix: str) -> list: + """Find files ending with given suffix.""" + result = [] + for root, _, files in os.walk(directory): + for f in files: + if f.endswith(suffix): + result.append(os.path.join(root, f)) + return result + + +def path_modifier(old_path: str, new_storage: str, cluster, base_path: str): + """Create a new absolute path for a different storage location.""" + # Extract just the filename/relative portion + if "://" in old_path: + # Parse out the path part after protocol://bucket/ + parts = old_path.split("/") + # Find where the actual path starts (after bucket) + for i, part in enumerate(parts): + if base_path.split("/")[0] in part or "var" in part: + relative = "/".join(parts[i:]) + break + else: + relative = parts[-1] + else: + relative = old_path.lstrip("/") + + return get_absolute_path(new_storage, cluster, relative) + + +# ============================================================================= +# Tests +# ============================================================================= + +STORAGE_TYPES = ["s3", "azure", "local"] + +def _get_type_family(t): + if t.startswith("s3"): + return "s3" + elif t.startswith("azure"): + return "azure" + return t + +def _generate_valid_combinations(): + """ + Generate valid storage combinations. + Rule: all components must be same type family as metadata, OR local. + Local doesn't need credentials, so S3+local and Azure+local work. + But S3+Azure doesn't work (credentials aren't interchangeable). + """ + combinations = [] + for metadata in STORAGE_TYPES: + main_family = _get_type_family(metadata) + for manifest_list in STORAGE_TYPES: + if _get_type_family(manifest_list) not in (main_family, "local"): + continue + for manifest in STORAGE_TYPES: + if _get_type_family(manifest) not in (main_family, "local"): + continue + for data in STORAGE_TYPES: + if _get_type_family(data) not in (main_family, "local"): + continue + combinations.append((metadata, manifest_list, manifest, data)) + return combinations + +VALID_COMBINATIONS = _generate_valid_combinations() + +@pytest.mark.parametrize("metadata_storage,manifest_list_storage,manifest_storage,data_storage", VALID_COMBINATIONS) +def test_multi_storage_combinations(started_cluster, metadata_storage, manifest_list_storage, manifest_storage, data_storage): + """ + Test Iceberg table with all components in different storage locations. + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_combo_{get_uuid_str()}" + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, value STRING) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')") + + # Upload to default S3 first + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + # Download all files + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + # Step 1: Modify manifest files to point to data_storage + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + # Step 2: Modify manifest-list files to point to manifest_storage + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + # Step 3: Modify metadata.json to point to manifest_list_storage + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + + # Update snapshot manifest-list paths + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + # Step 4: Upload to respective storages + # Metadata files (*.metadata.json, version-hint.text) + meta_uploader = get_uploader(metadata_storage, started_cluster) + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + meta_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest-list files + ml_uploader = get_uploader(manifest_list_storage, started_cluster) + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + ml_uploader.upload_file(f, f"{base_path}/{rel}") + + # Manifest files + m_uploader = get_uploader(manifest_storage, started_cluster) + for f in manifest_files: + rel = os.path.relpath(f, host_path) + m_uploader.upload_file(f, f"{base_path}/{rel}") + + # Data files + d_uploader = get_uploader(data_storage, started_cluster) + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + d_uploader.upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + func = get_table_function(metadata_storage) + args = get_query_args(metadata_storage, started_cluster, base_path) + + assert instance.query(f"SELECT * FROM {func}({args}) ORDER BY id") == "1\talpha\n2\tbeta\n3\tgamma\n" + + +# S3 is the primary use case for cross-bucket access. +# Azure cross-container: not supported (account_key not extractable from credential object). +def test_four_different_s3_buckets(started_cluster): + """S3: each component in a different bucket (metadata, manifest-list, manifest, data).""" + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = f"test_four_buckets_{get_uuid_str()}" + buckets = [ + started_cluster.minio_bucket, + f"{started_cluster.minio_bucket}-storage1", + f"{started_cluster.minio_bucket}-storage2", + f"{started_cluster.minio_bucket}-storage3", + ] + + metadata_storage = f"s3:{buckets[0]}" + manifest_list_storage = f"s3:{buckets[1]}" + manifest_storage = f"s3:{buckets[2]}" + data_storage = f"s3:{buckets[3]}" + + uploaders = {f"s3:{b}": S3Uploader(started_cluster.minio_client, b) for b in buckets} + + spark.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING, score INT) USING iceberg OPTIONS('format-version'='2')") + spark.sql(f"INSERT INTO {TABLE_NAME} VALUES (1, 'Alice', 100), (2, 'Bob', 85), (3, 'Carol', 92)") + + default_upload_directory(started_cluster, "s3", f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/") + + temp_dir = tempfile.mkdtemp() + host_path = os.path.join(temp_dir, TABLE_NAME) + os.makedirs(host_path, exist_ok=True) + + default_download_directory(started_cluster, "s3", f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", host_path) + + base_path = f"var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}" + metadata_dir = os.path.join(host_path, "metadata") + data_dir = os.path.join(host_path, "data") + + manifest_files = [f for f in find_files(metadata_dir, ".avro") if not os.path.basename(f).startswith("snap-")] + for mf in manifest_files: + modify_avro_file(mf, ["data_file", "file_path"], + lambda p: path_modifier(p, data_storage, started_cluster, base_path)) + + manifest_list_files = [f for f in find_files(metadata_dir, ".avro") if os.path.basename(f).startswith("snap-")] + for ml in manifest_list_files: + modify_avro_file(ml, ["manifest_path"], + lambda p: path_modifier(p, manifest_storage, started_cluster, base_path)) + + for mj in find_files(metadata_dir, ".metadata.json"): + with open(mj, 'r') as f: + data = json.load(f) + data["location"] = get_absolute_path(metadata_storage, started_cluster, base_path) + if "snapshots" in data: + for snap in data["snapshots"]: + if "manifest-list" in snap: + snap["manifest-list"] = path_modifier(snap["manifest-list"], manifest_list_storage, started_cluster, base_path) + with open(mj, 'w') as f: + json.dump(data, f, indent=2) + + for f in find_files(metadata_dir, ".metadata.json") + find_files(metadata_dir, "version-hint.text"): + rel = os.path.relpath(f, host_path) + uploaders[metadata_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_list_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_list_storage].upload_file(f, f"{base_path}/{rel}") + + for f in manifest_files: + rel = os.path.relpath(f, host_path) + uploaders[manifest_storage].upload_file(f, f"{base_path}/{rel}") + + if os.path.exists(data_dir): + for f in find_files(data_dir, ".parquet"): + rel = os.path.relpath(f, host_path) + uploaders[data_storage].upload_file(f, f"{base_path}/{rel}") + + shutil.rmtree(temp_dir) + + minio_url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}" + result = instance.query(f"SELECT * FROM icebergS3(s3, filename='{base_path}/', format=Parquet, url='{minio_url}/{buckets[0]}/') ORDER BY id") + + assert result == "1\tAlice\t100\n2\tBob\t85\n3\tCarol\t92\n" \ No newline at end of file diff --git a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py index 5cb1c02a0c07..9a60da2b2301 100644 --- a/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py +++ b/tests/integration/test_storage_iceberg_schema_evolution/test_array_evolved_with_struct.py @@ -55,7 +55,7 @@ def execute_spark_query(query: str): execute_spark_query( f""" - INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('name', 'Singapore', 'zip', 12345), named_struct('name', 'Moscow', 'zip', 54321)), ARRAY(1,2)); + INSERT INTO {TABLE_NAME} VALUES (ARRAY(named_struct('city', 'Singapore', 'zip', 12345), named_struct('city', 'Moscow', 'zip', 54321)), ARRAY(1,2)); """ ) diff --git a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json index 8d367d20f041..a983881af8f0 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "d4b695ca-ceeb-4537-8a2a-eee90dc6e313", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test", + "location" : "s3a://test/field_ids_complex_test", "last-sequence-number" : 1, "last-updated-ms" : 1757661733693, "last-column-id" : 9, @@ -96,7 +96,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", + "manifest-list" : "s3a://test/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json index 2d149abb44e7..d6c9079228ac 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "149ecc15-7afc-4311-86b3-3a4c8d4ec08e", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test", + "location" : "s3a://test/field_ids_struct_test", "last-sequence-number" : 1, "last-updated-ms" : 1753959190403, "last-column-id" : 6, @@ -84,7 +84,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", + "manifest-list" : "s3a://test/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json index 32225eb618ad..1ddc3492cc82 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "8f1f9ae2-18bb-421e-b640-ec2f85e67bce", - "location" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test", + "location" : "s3a://test/field_ids_table_test", "last-sequence-number" : 1, "last-updated-ms" : 1752481476160, "last-column-id" : 1, @@ -56,7 +56,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", + "manifest-list" : "s3a://test/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", "schema-id" : 0 } ], "statistics" : [ ], From 4cd644b3855b86ccda172aeedfed6df28f326365 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Wed, 20 May 2026 22:33:15 +0200 Subject: [PATCH 2/2] Fix build after PR 90740 cherry-pick - src/Storages/ObjectStorage/Utils.cpp: drop leftover `S3UriStyle::AUTO` arguments from two `S3::URI` ctor calls (S3UriStyle does not exist on antalya-26.3; the S3UriStyle parameter was already dropped from the ctor signature in the cherry-pick). - src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp: `collectRetainedFiles` and `collectExpiredFiles` now pass a local empty `SecondaryStorages` to `getManifestList` / `getManifestFileEntriesHandle` so the new mandatory parameter compiles without threading external storages through the `Iceberg::expireSnapshots` dispatcher. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ObjectStorage/DataLakes/Iceberg/Mutations.cpp | 11 +++++++---- src/Storages/ObjectStorage/Utils.cpp | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index e9ee547b1b44..2bc79622d5c0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -1013,15 +1014,16 @@ static void collectRetainedFiles( String storage_manifest_list_path = getProperFilePathFromMetadataInfo( manifest_list_path, persistent_table_components.table_path, persistent_table_components.table_location); + SecondaryStorages local_secondary_storages; auto manifest_keys = getManifestList( - object_storage, persistent_table_components, context, storage_manifest_list_path, log); + object_storage, persistent_table_components, context, storage_manifest_list_path, log, local_secondary_storages); for (const auto & mf_key : manifest_keys) { retained_manifest_paths.insert(mf_key.manifest_file_path); auto entries_handle = getManifestFileEntriesHandle( object_storage, persistent_table_components, context, log, - mf_key, current_schema_id); + mf_key, current_schema_id, local_secondary_storages); collectAllFilePaths(entries_handle, retained_data_file_paths); } } @@ -1064,10 +1066,11 @@ static ExpiredFiles collectExpiredFiles( ml_path, persistent_table_components.table_path, persistent_table_components.table_location); ManifestFileCacheKeys manifest_keys; + SecondaryStorages local_secondary_storages; try { manifest_keys = getManifestList( - object_storage, persistent_table_components, context, storage_ml_path, log); + object_storage, persistent_table_components, context, storage_ml_path, log, local_secondary_storages); } catch (...) { @@ -1087,7 +1090,7 @@ static ExpiredFiles collectExpiredFiles( { auto entries_handle = getManifestFileEntriesHandle( object_storage, persistent_table_components, context, log, - mf_key, current_schema_id); + mf_key, current_schema_id, local_secondary_storages); for (const auto & entry : entries_handle.getFilesWithoutDeleted(FileContentType::DATA)) if (!retained_data_file_paths.contains(entry->file_path)) diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 0f912ae779fc..7e19d7ff668c 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -552,7 +552,7 @@ std::pair resolveObjectStorageForPath( /// Paths from metadata already have correct encoding; disable Poco::URI /// percent-decoding so that keys like `col=12%3A00%3A00` are preserved as-is. S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, - /*keep_presigned_query_parameters*/ true, /*uri_style*/ S3UriStyle::AUTO, + /*keep_presigned_query_parameters*/ true, /*enable_url_encoding*/ false); std::string key_to_use = s3_uri.key; @@ -582,7 +582,7 @@ std::pair resolveObjectStorageForPath( normalized_table_location = "gs://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; } S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, - /*keep_presigned_query_parameters*/ true, /*uri_style*/ S3UriStyle::AUTO, + /*keep_presigned_query_parameters*/ true, /*enable_url_encoding*/ false); if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized))