Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
partition_key_value.emplace_back(value);

std::unordered_map<Int32, ColumnInfo> columns_infos;
bool any_stats_field_present = false;

for (const auto & path : {c_data_file_value_counts, c_data_file_column_sizes, c_data_file_null_value_counts})
{
if (hasPath(path))
{
any_stats_field_present = true;
Field values_count = getValueFromRowByName(row_index, path);
for (const auto & column_stats : values_count.safeGet<Array>())
{
Expand Down Expand Up @@ -252,6 +254,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
partition_key_value,
columns_infos,
value_for_bounds,
any_stats_field_present,
file_format,
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
Expand Down Expand Up @@ -297,6 +300,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
partition_key_value,
columns_infos,
value_for_bounds,
any_stats_field_present,
file_format,
lower_reference_data_file_path,
upper_reference_data_file_path,
Expand Down Expand Up @@ -328,6 +332,7 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
partition_key_value,
columns_infos,
value_for_bounds,
any_stats_field_present,
file_format,
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ DataFileMetaInfo::DataFileMetaInfo(
Int32 table_schema_id,
Int32 file_schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_,
const std::unordered_map<Int32, std::pair<Field, Field>> & value_bounds_)
const std::unordered_map<Int32, std::pair<Field, Field>> & value_bounds_,
bool any_stats_field_present)
: stats_were_read(any_stats_field_present)
{
#if USE_AVRO
std::vector<Int32> column_ids;
Expand Down Expand Up @@ -180,6 +182,10 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info)

auto log = getLogger("DataFileMetaInfo");

// Missing field means old coordinator — default to false (safe: no absent-NULL).
if (file_info->has("stats_were_read"))
stats_were_read = static_cast<bool>(file_info->get("stats_were_read").convert<bool>());

if (file_info->has("columns"))
{
auto columns = file_info->getArray("columns");
Expand Down Expand Up @@ -225,6 +231,8 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const
{
Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object();

file_info->set("stats_were_read", stats_were_read);

if (!columns_info.empty())
{
Poco::JSON::Array::Ptr columns = new Poco::JSON::Array();
Expand Down Expand Up @@ -256,6 +264,7 @@ constexpr size_t FIELD_MASK_ALL = 0x7;

void DataFileMetaInfo::serialize(WriteBuffer & out) const
{
writeIntBinary(static_cast<UInt8>(stats_were_read), out);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this break backward compatibility?

auto size = columns_info.size();
writeIntBinary(size, out);
for (const auto & column : columns_info)
Expand Down Expand Up @@ -286,6 +295,10 @@ DataFileMetaInfo DataFileMetaInfo::deserialize(ReadBuffer & in)
{
DataFileMetaInfo result;

UInt8 stats_were_read_uint;
readIntBinary(stats_were_read_uint, in);
result.stats_were_read = static_cast<bool>(stats_were_read_uint);

size_t size;
readIntBinary(size, in);

Expand Down
19 changes: 18 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,36 @@ class DataFileMetaInfo
// deserialized using those types. After schema evolution (e.g. `int` -> `long`)
// the two ids differ, and using the table schema's type would misinterpret the
// bytes and produce a garbage hyperrectangle.
// any_stats_field_present must be true when at least one of the three per-column
// stats fields (value_counts, column_sizes, null_value_counts) was present in the
// manifest Avro writer schema. When false, the absent-NULL optimization is
// suppressed: we cannot distinguish a schema-evolution absent column from a column
// that merely has no stats written, so letting Parquet handle it is the only safe
// choice.
explicit DataFileMetaInfo(
const Iceberg::IcebergSchemaProcessor & schema_processor,
Int32 table_schema_id,
Int32 file_schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_,
const std::unordered_map<Int32, std::pair<Field, Field>> & value_bounds_);
const std::unordered_map<Int32, std::pair<Field, Field>> & value_bounds_,
bool any_stats_field_present);

void serialize(WriteBuffer & out) const;
static DataFileMetaInfo deserialize(ReadBuffer & in);

bool empty() const { return columns_info.empty(); }

std::unordered_map<std::string, ColumnInfo> columns_info;

// True when the manifest Avro schema included at least one of value_counts,
// column_sizes, or null_value_counts. columns_info is authoritative only when
// this is true: a column absent from columns_info was not written to the data
// file and may be substituted with constant NULL.
//
// Serialization note: old nodes do not write this field. On deserialization of
// a missing field (JSON or binary), default to false so that the absent-NULL
// optimization is skipped entirely — safe fallback, minor optimization loss only.
bool stats_were_read = false;
};

using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ ObjectInfoPtr IcebergIterator::next(size_t)
table_schema_id, /// current schema id to use current column names
manifest_file_entry->resolved_schema_id, /// file's schema id to interpret value_bounds bytes
manifest_file_entry->parsed_entry->columns_infos,
manifest_file_entry->parsed_entry->value_bounds));
manifest_file_entry->parsed_entry->value_bounds,
manifest_file_entry->parsed_entry->any_stats_field_present));
ProfileEvents::increment(ProfileEvents::IcebergMetadataReturnedObjectInfos);
return object_info;
}
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct ParsedManifestFileEntry : boost::noncopyable
DB::Row partition_key_value;
std::unordered_map<Int32, ColumnInfo> columns_infos;
std::unordered_map<Int32, std::pair<Field, Field>> value_bounds;
bool any_stats_field_present = false;

String file_format;
std::optional<String> lower_reference_data_file_path; // For position delete files only.
Expand All @@ -110,6 +111,7 @@ struct ParsedManifestFileEntry : boost::noncopyable
DB::Row partition_key_value_,
std::unordered_map<Int32, ColumnInfo> columns_infos_,
std::unordered_map<Int32, std::pair<Field, Field>> value_bounds_,
bool any_stats_field_present_,
String file_format_,
std::optional<String> lower_reference_data_file_path_,
std::optional<String> upper_reference_data_file_path_,
Expand All @@ -126,6 +128,7 @@ struct ParsedManifestFileEntry : boost::noncopyable
, partition_key_value(std::move(partition_key_value_))
, columns_infos(std::move(columns_infos_))
, value_bounds(std::move(value_bounds_))
, any_stats_field_present(any_stats_field_present_)
, file_format(std::move(file_format_))
, lower_reference_data_file_path(std::move(lower_reference_data_file_path_))
, upper_reference_data_file_path(std::move(upper_reference_data_file_path_))
Expand Down
63 changes: 30 additions & 33 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,40 +886,37 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
}
}
}
for (const auto & column : requested_columns_list)
if (file_meta_data.value()->stats_were_read)
{
const auto & column_name = column.first;

if (file_meta_data.value()->columns_info.contains(column_name))
continue;

if (!column.second.second.type->isNullable())
continue;

/// With View over Iceberg table we have someting like 'materialize(time)' as column_name
/// Simple cheap check
if (column_name.starts_with("materialize(") && column_name.ends_with(")"))
continue;

/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;

/// Column is nullable and absent in file
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);

LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
for (const auto & column : requested_columns_list)
{
const auto & column_name = column.first;
if (file_meta_data.value()->columns_info.contains(column_name))
continue;
if (!column.second.second.type->isNullable())
continue;
/// With View over Iceberg table we have someting like 'materialize(time)' as column_name
/// Simple cheap check
if (column_name.starts_with("materialize(") && column_name.ends_with(")"))
continue;
/// Skip columns produced by prewhere or row-level filter expressions —
/// they are computed at read time, not stored in the file.
if (format_filter_info
&& ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name)
|| (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name)))
continue;
/// Column is nullable and absent in file
constant_columns_with_values[column.second.first] =
ConstColumnWithValue{
column.second.second,
Field()
};
constant_columns.insert(column_name);
LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'",
object_info->getPath(),
column_name,
column.second.second.type);
}
}
}

Expand Down
Loading
Loading