Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/docker/integration/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.1,\
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,\
org.apache.hadoop:hadoop-aws:3.3.4,\
com.amazonaws:aws-java-sdk-bundle:1.12.262,\
org.apache.hadoop:hadoop-azure:3.3.4,\
com.microsoft.azure:azure-storage:8.6.6,\
org.apache.spark:spark-avro_2.12:3.5.1"\
&& /spark-3.5.5-bin-hadoop3/bin/spark-shell --packages "$packages" \
&& find /root/.ivy2/ -name '*.jar' -exec ln -sf {} /spark-3.5.5-bin-hadoop3/jars/ \;
Expand Down
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_META
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH = 7;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_ABSOLUTE_PATH;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ Use multiple threads for azure multipart upload.
)", 0) \
DECLARE(Bool, s3_throw_on_zero_files_match, false, R"(
Throw an error, when ListObjects request cannot match any files
)", 0) \
DECLARE(Bool, s3_propagate_credentials_to_other_storages, false, R"(
Credentials from the base storage are always propagated to secondary object storages when endpoints match. When this setting is enabled, credentials are also propagated when endpoints differ, including less secure connections (for example, from `https` to plain `http`).
)", 0) \
DECLARE(Bool, hdfs_throw_on_zero_files_match, false, R"(
Throw an error if matched zero files according to glob expansion rules.
Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"output_format_parquet_use_custom_encoder", true, true, "Obsolete setting, the custom encoder is now always used."},
{"output_format_parquet_version", "2.latest", "2.latest", "Obsolete setting, the custom encoder always writes Parquet V2.6+."},
{"output_format_parquet_compliant_nested_types", true, true, "Obsolete setting, the custom encoder always uses compliant nested types."},
{"input_format_parquet_use_native_reader_v3", true, true, "Obsolete setting, the native reader v3 is now always used."},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
LOG_DEBUG(log, "Has no credentials");
}
}
else if (!lightweight && table_metadata.requiresCredentials() && std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end())
else if (!lightweight && table_metadata.requiresCredentials()
&& std::find(vended_credentials_catalogs.begin(), vended_credentials_catalogs.end(), catalog->getCatalogType()) == vended_credentials_catalogs.end()
&& table_metadata.getStorageType() != DatabaseDataLakeStorageType::Local)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
Expand Down
28 changes: 11 additions & 17 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@
#include <DataTypes/DataTypesNumber.h>


#include <IO/S3/Credentials.h>
#include <IO/S3/Client.h>
#include <IO/S3Settings.h>
#include <Databases/DataLake/Common.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Databases/DataLake/Common.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <IO/S3/Client.h>
#include <IO/S3/Credentials.h>
#include <IO/S3Settings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Common/ProxyConfigurationResolverProvider.h>

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -554,14 +555,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo
try
{
auto [metadata_version, metadata_path, compression_method] = DB::Iceberg::getLatestOrExplicitMetadataFileAndVersion(
object_storage,
table_path,
*storage_settings,
nullptr,
getContext(),
log.get(),
std::nullopt
);
object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None);

LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location);

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran

ObjectMetadata ReadBufferFromAzureBlobStorage::getObjectMetadataFromTheLastRequest() const
{
if (last_object_metadata.get()->has_value())
if (!last_object_metadata.get()->has_value())
throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests");

return last_object_metadata.get()->value();
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ namespace DB

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper, bool enable_url_encoding = true)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty()
? uri
: Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding);
}
};

Expand All @@ -32,7 +34,7 @@ namespace ErrorCodes
namespace S3
{

URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_presigned_query_parameters, bool enable_url_encoding)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
Expand All @@ -54,9 +56,9 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
else
uri_str = uri_;

uri = Poco::URI(uri_str);
uri = Poco::URI(uri_str, enable_url_encoding);
/// Keep a copy of how Poco parsed the original string before any mapping
Poco::URI original_uri(uri_str);
Poco::URI original_uri(uri_str, enable_url_encoding);
bool looks_like_presigned = false;
for (const auto & [qk, qv] : original_uri.getQueryParameters())
{
Expand Down Expand Up @@ -101,7 +103,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
URIConverter::modifyURI(uri, mapper, enable_url_encoding);
}

storage_name = "S3";
Expand Down
3 changes: 2 additions & 1 deletion src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct URI
explicit URI(
const std::string & uri_,
bool allow_archive_path_syntax = false,
bool keep_presigned_query_parameters = true);
bool keep_presigned_query_parameters = true,
bool enable_url_encoding = true);
void addRegionToURI(const std::string & region);

static void validateBucket(const std::string & bucket, const Poco::URI & uri);
Expand Down
6 changes: 2 additions & 4 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
data_lake_metadata = object->data_lake_metadata.value();

#if USE_AVRO
if (std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
{
iceberg_info = dynamic_cast<IcebergDataObjectInfo &>(*object).info;
}
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object))
iceberg_info = iceberg_object->info;
#endif

file_meta_info = object->relative_path_with_metadata.file_meta_info;
Expand Down
10 changes: 7 additions & 3 deletions src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status)
{
Expand All @@ -102,13 +102,17 @@ void insertRowToLogTable(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Iceberg metadata log table is not configured");
}

String normalized_table_path = table_path;
while (normalized_table_path.size() > 1 && normalized_table_path.back() == '/')
normalized_table_path.pop_back();

iceberg_metadata_log->add(
DB::IcebergMetadataLogElement{
.current_time = spec.tv_sec,
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
.table_path = normalized_table_path,
.file_path = file_path.serialize(),
.metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/IcebergMetadataLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/SettingsEnums.h>
#include <Interpreters/SystemLog.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>

namespace DB
Expand Down Expand Up @@ -33,7 +34,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromString.h>
#include <Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Common/UniqueLock.h>
Expand Down Expand Up @@ -35,7 +36,9 @@ namespace DB::Iceberg
using namespace DB;

AvroForIcebergDeserializer::AvroForIcebergDeserializer(
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
std::unique_ptr<ReadBufferFromFileBase> buffer_,
const IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings)
try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
Expand Down Expand Up @@ -155,8 +158,8 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
}


const auto file_path_key = getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>();
const auto file_path_from_metadata = IcebergPathFromMetadata::deserialize(
getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>());
/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
/// ...
Expand Down Expand Up @@ -244,7 +247,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
case FileContentType::DATA: {
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::DATA,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand All @@ -262,16 +265,18 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
case FileContentType::POSITION_DELETE: {
/// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests
std::optional<String> lower_reference_data_file_path = std::nullopt;
std::optional<String> upper_reference_data_file_path = std::nullopt;
std::optional<Iceberg::IcebergPathFromMetadata> lower_reference_data_file_path;
std::optional<Iceberg::IcebergPathFromMetadata> upper_reference_data_file_path;
bool bounds_set_by_referenced_data_file = false;
if (hasPath(c_data_file_referenced_data_file))
{
Field reference_file_path_field = getValueFromRowByName(row_index, c_data_file_referenced_data_file);
if (!reference_file_path_field.isNull())
{
lower_reference_data_file_path = reference_file_path_field.safeGet<String>();
upper_reference_data_file_path = reference_file_path_field.safeGet<String>();
lower_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
upper_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
bounds_set_by_referenced_data_file = true;
}
}
Expand All @@ -282,14 +287,14 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
{
auto & [lower, upper] = it->second;
if (!lower.isNull())
lower_reference_data_file_path = lower.safeGet<String>();
lower_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(lower.safeGet<String>()));
if (!upper.isNull())
upper_reference_data_file_path = upper.safeGet<String>();
upper_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(upper.safeGet<String>()));
}
}
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::POSITION_DELETE,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand Down Expand Up @@ -320,7 +325,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
c_data_file_equality_ids);
return std::make_shared<const ParsedManifestFileEntry>(
FileContentType::EQUALITY_DELETE,
file_path_key,
file_path_from_metadata,
row_index,
status,
sequence_number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Common/SharedMutex.h>


Expand Down Expand Up @@ -36,7 +37,7 @@ class AvroForIcebergDeserializer
{
private:
std::unique_ptr<DB::ReadBufferFromFileBase> buffer;
std::string manifest_file_path;
Iceberg::IcebergPathFromMetadata manifest_file_path;
DB::ColumnPtr parsed_column;
std::shared_ptr<const DB::DataTypeTuple> parsed_column_data_type;
mutable std::optional<ColumnsWithTypeAndName> cache_parsed_columns TSA_GUARDED_BY(cache_mutex);
Expand All @@ -61,7 +62,7 @@ class AvroForIcebergDeserializer
public:
AvroForIcebergDeserializer(
std::unique_ptr<DB::ReadBufferFromFileBase> buffer_,
const std::string & manifest_file_path_,
const Iceberg::IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings);

size_t rows() const;
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// Returns the full table location URI (e.g. `s3a://bucket/prefix/table/`)
virtual std::string getTableLocation() const { return {}; }

/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Loading
Loading