Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/src/arrow/array/array_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@ class ARROW_EXPORT Array {
/// \return const std::shared_ptr<ArrayStatistics>&
const std::shared_ptr<ArrayStatistics>& statistics() const { return data_->statistics; }

/// \brief Create a statistics array of this array
///
/// The created array follows the C data interface statistics
/// specification. See
/// https://arrow.apache.org/docs/format/StatisticsSchema.html
/// for details.
///
/// \param[in] pool the memory pool to allocate memory from
/// \return the statistics array of this array
Result<std::shared_ptr<Array>> MakeStatisticsArray(
MemoryPool* pool = default_memory_pool()) const;

protected:
Array() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(Array);
Expand Down
224 changes: 122 additions & 102 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,118 +512,120 @@ struct EnumeratedStatistics {
};
using OnStatistics =
std::function<Status(const EnumeratedStatistics& enumerated_statistics)>;
Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) {
EnumeratedStatistics statistics;
statistics.nth_statistics = 0;
statistics.start_new_column = true;
statistics.nth_column = std::nullopt;

statistics.key = ARROW_STATISTICS_KEY_ROW_COUNT_EXACT;
statistics.type = int64();
statistics.value = record_batch.num_rows();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;

const auto& schema = record_batch.schema();
const auto num_fields = schema->num_fields();
for (int nth_column = 0; nth_column < num_fields; ++nth_column) {
const auto& field = schema->field(nth_column);
auto column_statistics = record_batch.column(nth_column)->statistics();
if (!column_statistics) {
continue;
}

statistics.start_new_column = true;
Status EnumerateOneStatistics(std::optional<int32_t> nth_column,
const std::shared_ptr<DataType>& array_type,
const std::shared_ptr<ArrayStatistics>& column_statistics,
int* nth_statistics, OnStatistics on_statistics,
std::optional<int64_t> exact_row_count = std::nullopt) {
bool start_new_column = true;
auto emit = [&](const char* key, const std::shared_ptr<DataType>& type,
ArrayStatistics::ValueType value) {
EnumeratedStatistics statistics;
statistics.nth_statistics = (*nth_statistics)++;
statistics.start_new_column = start_new_column;
statistics.nth_column = nth_column;
if (column_statistics->null_count.has_value()) {
statistics.nth_statistics++;
if (std::holds_alternative<int64_t>(column_statistics->null_count.value())) {
statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_EXACT;
statistics.type = int64();
statistics.value = std::get<int64_t>(column_statistics->null_count.value());
RETURN_NOT_OK(on_statistics(statistics));
} else {
statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE;
statistics.type = float64();
statistics.value = std::get<double>(column_statistics->null_count.value());
RETURN_NOT_OK(on_statistics(statistics));
}
statistics.start_new_column = false;
statistics.key = key;
statistics.type = type;
statistics.value = std::move(value);
RETURN_NOT_OK(on_statistics(statistics));
start_new_column = false;
return Status::OK();
};
auto emit_count = [&](const ArrayStatistics::CountType& value, const char* exact_key,
const char* approximate_key) {
if (std::holds_alternative<int64_t>(value)) {
return emit(exact_key, int64(), std::get<int64_t>(value));
}
return emit(approximate_key, float64(), std::get<double>(value));
};

if (column_statistics->distinct_count.has_value()) {
statistics.nth_statistics++;
if (std::holds_alternative<int64_t>(column_statistics->distinct_count.value())) {
statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT;
statistics.type = int64();
statistics.value = std::get<int64_t>(column_statistics->distinct_count.value());
} else {
statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE;
statistics.type = float64();
statistics.value = std::get<double>(column_statistics->distinct_count.value());
}
if (exact_row_count.has_value()) {
RETURN_NOT_OK(
emit(ARROW_STATISTICS_KEY_ROW_COUNT_EXACT, int64(), exact_row_count.value()));
}
if (!column_statistics) {
return Status::OK();
}

RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
if (column_statistics->null_count.has_value()) {
RETURN_NOT_OK(emit_count(column_statistics->null_count.value(),
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT,
ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE));
}

if (column_statistics->max_byte_width.has_value()) {
statistics.nth_statistics++;
if (std::holds_alternative<int64_t>(column_statistics->max_byte_width.value())) {
statistics.key = ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT;
statistics.type = int64();
statistics.value = std::get<int64_t>(column_statistics->max_byte_width.value());
} else {
statistics.key = ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE;
statistics.type = float64();
statistics.value = std::get<double>(column_statistics->max_byte_width.value());
}
if (column_statistics->distinct_count.has_value()) {
RETURN_NOT_OK(emit_count(column_statistics->distinct_count.value(),
ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT,
ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE));
}

RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
if (column_statistics->max_byte_width.has_value()) {
RETURN_NOT_OK(emit_count(column_statistics->max_byte_width.value(),
ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT,
ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE));
}

if (column_statistics->average_byte_width.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_average_byte_width_exact) {
statistics.key = ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE;
}
statistics.type = float64();
statistics.value = column_statistics->average_byte_width.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
if (column_statistics->average_byte_width.has_value()) {
RETURN_NOT_OK(emit(column_statistics->is_average_byte_width_exact
? ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT
: ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE,
float64(), column_statistics->average_byte_width.value()));
}

if (column_statistics->min.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_min_exact) {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MinArrowType(field->type());
statistics.value = column_statistics->min.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
if (column_statistics->min.has_value()) {
RETURN_NOT_OK(emit(
column_statistics->is_min_exact ? ARROW_STATISTICS_KEY_MIN_VALUE_EXACT
: ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE,
column_statistics->MinArrowType(array_type), column_statistics->min.value()));
}

if (column_statistics->max.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_max_exact) {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MaxArrowType(field->type());
statistics.value = column_statistics->max.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
if (column_statistics->max.has_value()) {
RETURN_NOT_OK(emit(
column_statistics->is_max_exact ? ARROW_STATISTICS_KEY_MAX_VALUE_EXACT
: ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE,
column_statistics->MaxArrowType(array_type), column_statistics->max.value()));
}

return Status::OK();
}

Status EnumerateArrayStatistics(const std::shared_ptr<ArrayData>& data,
int32_t* nth_column, int* nth_statistics,
OnStatistics on_statistics,
bool include_exact_row_count = false) {
const auto current_column = (*nth_column)++;
Comment on lines +593 to +597
RETURN_NOT_OK(EnumerateOneStatistics(
current_column, data->type, data->statistics, nth_statistics, on_statistics,
include_exact_row_count ? std::optional<int64_t>(data->length) : std::nullopt));
for (const auto& child_data : data->child_data) {
if (child_data) {
RETURN_NOT_OK(EnumerateArrayStatistics(child_data, nth_column, nth_statistics,
on_statistics));
}
}
return Status::OK();
}

Status EnumerateStatistics(const Array& array, OnStatistics on_statistics) {
int nth_statistics = 0;
int32_t nth_column = 0;
return EnumerateArrayStatistics(array.data(), &nth_column, &nth_statistics,
std::move(on_statistics),
/*include_exact_row_count=*/true);
}

Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) {
int nth_statistics = 0;
RETURN_NOT_OK(EnumerateOneStatistics(std::nullopt, null(), nullptr, &nth_statistics,
on_statistics, record_batch.num_rows()));
int32_t nth_column = 0;
for (const auto& column : record_batch.columns()) {
RETURN_NOT_OK(EnumerateArrayStatistics(column->data(), &nth_column, &nth_statistics,
on_statistics));
}
return Status::OK();
}
struct StringBuilderVisitor {
template <typename DataType>
enable_if_has_string_view<DataType, Status> Visit(const DataType&,
Expand All @@ -641,8 +643,9 @@ struct StringBuilderVisitor {
};
} // namespace

Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
MemoryPool* memory_pool) const {
template <typename EnumerateStatisticsFunction>
Result<std::shared_ptr<Array>> MakeStatisticsArrayFromEnumerator(
EnumerateStatisticsFunction enumerate_statistics, MemoryPool* memory_pool) {
Comment on lines +646 to +648
// Statistics schema:
// struct<
// column: int32,
Expand All @@ -660,7 +663,7 @@ Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
// needled value types. The following block collects these types.
std::vector<std::shared_ptr<Field>> values_types;
std::vector<int8_t> values_type_indexes;
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
RETURN_NOT_OK(enumerate_statistics([&](const EnumeratedStatistics& statistics) {
int8_t i = 0;
for (const auto& field : values_types) {
if (field->type()->Equals(statistics.type)) {
Expand Down Expand Up @@ -733,7 +736,7 @@ Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
StructBuilder builder(statistics_type, memory_pool, std::move(field_builders));

// Append statistics.
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
RETURN_NOT_OK(enumerate_statistics([&](const EnumeratedStatistics& statistics) {
if (statistics.start_new_column) {
RETURN_NOT_OK(builder.Append());
if (statistics.nth_column.has_value()) {
Expand Down Expand Up @@ -777,6 +780,23 @@ Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
return builder.Finish();
}

Result<std::shared_ptr<Array>> Array::MakeStatisticsArray(MemoryPool* memory_pool) const {
return MakeStatisticsArrayFromEnumerator(
[&](OnStatistics on_statistics) {
return EnumerateStatistics(*this, std::move(on_statistics));
},
memory_pool);
}

Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
MemoryPool* memory_pool) const {
return MakeStatisticsArrayFromEnumerator(
[&](OnStatistics on_statistics) {
return EnumerateStatistics(*this, std::move(on_statistics));
},
memory_pool);
}

Status RecordBatch::Validate() const {
return ValidateBatch(*this, /*full_validation=*/false);
}
Expand Down
93 changes: 93 additions & 0 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,99 @@ TEST_F(TestRecordBatch, MakeStatisticsArrayRowCount) {
AssertArraysEqual(*expected_statistics_array, *statistics_array, true);
}

TEST_F(TestRecordBatch, MakeStatisticsArrayForArray) {
auto int32_array_data = ArrayFromJSON(int32(), "[1, null, -1]")->data()->Copy();
int32_array_data->statistics = std::make_shared<ArrayStatistics>();
int32_array_data->statistics->null_count = int64_t{1};
int32_array_data->statistics->distinct_count = int64_t{2};
int32_array_data->statistics->max = int64_t{1};
int32_array_data->statistics->is_max_exact = true;
int32_array_data->statistics->min = int64_t{-1};
int32_array_data->statistics->is_min_exact = true;
auto int32_array = MakeArray(std::move(int32_array_data));

ASSERT_OK_AND_ASSIGN(auto statistics_array, int32_array->MakeStatisticsArray());

ASSERT_OK_AND_ASSIGN(auto expected_statistics_array,
MakeStatisticsArray("[0]",
{{
ARROW_STATISTICS_KEY_ROW_COUNT_EXACT,
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT,
ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT,
ARROW_STATISTICS_KEY_MIN_VALUE_EXACT,
ARROW_STATISTICS_KEY_MAX_VALUE_EXACT,
}},
{{
ArrayStatistics::ValueType{int64_t{3}},
ArrayStatistics::ValueType{int64_t{1}},
ArrayStatistics::ValueType{int64_t{2}},
ArrayStatistics::ValueType{int64_t{-1}},
ArrayStatistics::ValueType{int64_t{1}},
}}));
AssertArraysEqual(*expected_statistics_array, *statistics_array, true);
}

TEST_F(TestRecordBatch, MakeStatisticsArrayNestedFields) {
auto struct_type = struct_({field("a", int32()), field("b", float64())});
auto struct_data =
ArrayFromJSON(struct_type,
R"([{"a": 1, "b": 1.5}, {"a": 2, "b": null}, {"a": 3, "b": 3.5}])")
->data()
->Copy();
struct_data->statistics = std::make_shared<ArrayStatistics>();
struct_data->statistics->null_count = int64_t{0};
struct_data->child_data[0] = struct_data->child_data[0]->Copy();
struct_data->child_data[0]->statistics = std::make_shared<ArrayStatistics>();
struct_data->child_data[0]->statistics->null_count = int64_t{0};
struct_data->child_data[0]->statistics->distinct_count = int64_t{3};
struct_data->child_data[1] = struct_data->child_data[1]->Copy();
struct_data->child_data[1]->statistics = std::make_shared<ArrayStatistics>();
struct_data->child_data[1]->statistics->null_count = int64_t{1};
struct_data->child_data[1]->statistics->max = 3.5;
struct_data->child_data[1]->statistics->is_max_exact = true;
struct_data->child_data[1]->statistics->min = 1.5;
struct_data->child_data[1]->statistics->is_min_exact = true;
auto struct_array = MakeArray(std::move(struct_data));
auto batch = RecordBatch::Make(::arrow::schema({field("col1", struct_type)}),
struct_array->length(), {struct_array});

ASSERT_OK_AND_ASSIGN(auto statistics_array, batch->MakeStatisticsArray());

ASSERT_OK_AND_ASSIGN(auto expected_statistics_array,
MakeStatisticsArray("[null, 0, 1, 2]",
{{
ARROW_STATISTICS_KEY_ROW_COUNT_EXACT,
},
{
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT,
},
{
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT,
ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT,
},
{
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT,
ARROW_STATISTICS_KEY_MIN_VALUE_EXACT,
ARROW_STATISTICS_KEY_MAX_VALUE_EXACT,
}},
{{
ArrayStatistics::ValueType{int64_t{3}},
},
{
ArrayStatistics::ValueType{int64_t{0}},
},
{
ArrayStatistics::ValueType{int64_t{0}},
ArrayStatistics::ValueType{int64_t{3}},
},
{
ArrayStatistics::ValueType{int64_t{1}},
ArrayStatistics::ValueType{1.5},
ArrayStatistics::ValueType{3.5},
}}));
AssertArraysEqual(*expected_statistics_array, *statistics_array, true);
}

TEST_F(TestRecordBatch, MakeStatisticsArrayNullCountExact) {
auto schema =
::arrow::schema({field("no-statistics", boolean()), field("int32", int32())});
Expand Down
Loading