Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 87 additions & 5 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <bit>
#include <cmath>
#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>

#include "arrow/io/memory.h"
#include "arrow/result.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/macros.h"

Expand Down Expand Up @@ -345,9 +348,89 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}

void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
const auto num_bits = static_cast<int64_t>(num_bytes_) * 8;
const auto total_set_bits =
::arrow::internal::CountSetBits(data_->data(), /*bit_offset=*/0, num_bits);
if (total_set_bits == 0) {
num_bytes_ = kMinimumBloomFilterBytes;
return;
}

const double avg_fill = static_cast<double>(total_set_bits) / num_bits;
const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp, avg_fill);
if (num_folds > 0) {
Fold(num_folds);
}
}

uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp,
double avg_fill) const {
const uint32_t num_blocks = NumBlocks();
if (num_blocks < 2) {
return 0;
}
DCHECK_EQ(num_blocks & (num_blocks - 1), 0);

// Estimate the fill rate after folding from the current average fill rate.
// Folding ORs block groups together, so each fold changes the estimated fill rate
// from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits, making
// the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
//
// See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
// https://hal.science/hal-01126174v1
const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));

uint32_t num_folds = 0;
double unset_probability_after_folds = 1.0 - avg_fill;
for (uint32_t i = 0; i < max_folds; ++i) {
unset_probability_after_folds *= unset_probability_after_folds;
const double folded_fill_rate = 1.0 - unset_probability_after_folds;
const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
if (estimated_fpp > target_fpp) {
break;
}
++num_folds;
}
return num_folds;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this algorithm the actual size reduction will always be a power of 2 (group_size = UINT32_C(1) << num_folds). Why aren't we trying to be more granular?

Copy link
Copy Markdown
Contributor Author

@HuaHuaY HuaHuaY Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockSplitBloomFilter::Init will check (num_bytes & (num_bytes - 1)) != 0. I didn't find this limitation in the Parquet documentation. But If we break the rule, old parquet reader will not be able to read the bloom filter.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. We probably don't want to produce data that would be incompatible with old readers.

Does the power-of-two constraint serve a purpose? Perhaps we can remove it in a separate PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, can you add a comment somewhere mentioning this restriction?

Copy link
Copy Markdown
Contributor Author

@HuaHuaY HuaHuaY Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have add a comment in front of group_size.

  // A fold group is a consecutive run of blocks ORed into one output block.
  // Keeping the group size as (1 << num_folds) preserves a power-of-two bitset
  // size. Folding by this power-of-two group size keeps the old-to-new bucket
  // remapping aligned with bucket lookup and avoids false negatives.
  const uint32_t group_size = UINT32_C(1) << num_folds;

After more thinking, I think the actual size reduction must be a power of 2. Because the block index is calculated by static_cast<uint32_t>(((hash >> 32) * NumBlocks()) >> 32);, which is required by parquet's document. And we must ensure that the calculated block index is the same before and after the fold.

}

void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
Comment thread
HuaHuaY marked this conversation as resolved.
DCHECK_GT(num_folds, 0);

const uint32_t num_blocks = NumBlocks();
// A fold group is a consecutive run of blocks ORed into one output block.
// Keeping the group size as (1 << num_folds) preserves a power-of-two bitset
// size. Folding by this power-of-two group size keeps the old-to-new bucket
// remapping aligned with bucket lookup and avoids false negatives.
const uint32_t group_size = UINT32_C(1) << num_folds;
Comment thread
HuaHuaY marked this conversation as resolved.
DCHECK_LE(group_size, num_blocks);

const uint32_t new_num_blocks = num_blocks / group_size;
auto* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());

for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) {
uint32_t* dst = bitset32 + dst_block * kBitsSetPerBlock;

const uint32_t src_block = dst_block * group_size;
const uint32_t* src = bitset32 + src_block * kBitsSetPerBlock;
if (dst != src) {
std::copy_n(src, kBitsSetPerBlock, dst);
}

for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) {
src = bitset32 + (src_block + fold_block) * kBitsSetPerBlock;
for (int word = 0; word < kBitsSetPerBlock; ++word) {
dst[word] |= src[word];
}
}
}

num_bytes_ = new_num_blocks * kBytesPerFilterBlock;
Comment thread
HuaHuaY marked this conversation as resolved.
}

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) * NumBlocks()) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());

Expand All @@ -363,8 +446,7 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
}

void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) * NumBlocks()) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());

Expand Down
13 changes: 11 additions & 2 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
/// @param fpp The false positive probability.
/// @return it always return a value between kMinimumBloomFilterBytes and
/// kMaximumBloomFilterBytes, and the return value is always a power of 2
static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
static uint32_t OptimalNumOfBytes(uint64_t ndv, double fpp) {
uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits));
return optimal_num_of_bits >> 3;
Expand All @@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
/// @param fpp The false positive probability.
/// @return it always return a value between kMinimumBloomFilterBytes * 8 and
/// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16
static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) {
Comment thread
wgtmac marked this conversation as resolved.
ARROW_DCHECK(fpp > 0.0 && fpp < 1.0);
const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
uint32_t num_bits;
Expand Down Expand Up @@ -276,6 +276,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
bool FindHash(uint64_t hash) const override;
void InsertHash(uint64_t hash) override;
void InsertHashes(const uint64_t* hashes, int num_values) override;
/// Fold the bloom filter down to the smallest size that still meets the target FPP
/// (False Positive Probability).
void FoldToTargetFpp(double target_fpp);
void WriteTo(ArrowOutputStream* sink) const override;
uint32_t GetBitsetSize() const override { return num_bytes_; }

Expand Down Expand Up @@ -350,6 +353,12 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {

private:
inline void InsertHashImpl(uint64_t hash);
uint32_t NumBlocks() const {
ARROW_DCHECK_EQ(num_bytes_ % kBytesPerFilterBlock, 0);
return num_bytes_ / kBytesPerFilterBlock;
}
uint32_t NumFoldsForTargetFpp(double target_fpp, double avg_fill) const;
void Fold(uint32_t num_folds);

// Bytes in a tiny Bloom filter block.
static constexpr int kBytesPerFilterBlock = 32;
Expand Down
100 changes: 97 additions & 3 deletions cpp/src/parquet/bloom_filter_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
"schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")});
schema.Init(root);

BloomFilterOptions bloom_filter_options{100, 0.05};
BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
bloom_filter_options.ndv, bloom_filter_options.fpp);
bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
WriterProperties::Builder properties_builder;
properties_builder.enable_bloom_filter("c1", bloom_filter_options);
auto writer_properties = properties_builder.build();
Expand Down Expand Up @@ -150,6 +150,100 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
}
}

namespace {

struct BloomFilterBuilderFoldingTestCase {
int64_t ndv;
bool fold;
int32_t inserted_count;
int64_t expected_bitset_ndv;
};

class BloomFilterBuilderFoldingTest
: public ::testing::TestWithParam<BloomFilterBuilderFoldingTestCase> {};

} // namespace

TEST_P(BloomFilterBuilderFoldingTest, RespectsOption) {
const auto& test_case = GetParam();

SchemaDescriptor schema;
schema::NodePtr root =
schema::GroupNode::Make("schema", Repetition::REPEATED, {schema::ByteArray("c1")});
schema.Init(root);

constexpr double kFpp = 0.05;
BloomFilterOptions bloom_filter_options{
.ndv = test_case.ndv, .fpp = kFpp, .fold = test_case.fold};
const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
WriterProperties::Builder properties_builder;
properties_builder.enable_bloom_filter("c1", bloom_filter_options);
auto writer_properties = properties_builder.build();
auto bloom_filter_builder = BloomFilterBuilder::Make(&schema, writer_properties.get());

bloom_filter_builder->AppendRowGroup();
auto bloom_filter = bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0);
ASSERT_NE(bloom_filter, nullptr);
ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize());

std::vector<uint64_t> hashes;
hashes.reserve(test_case.inserted_count);
for (int32_t i = 0; i < test_case.inserted_count; ++i) {
const auto hash = bloom_filter->Hash(i);
hashes.push_back(hash);
bloom_filter->InsertHash(hash);
}

auto sink = CreateOutputStream();
auto locations = bloom_filter_builder->WriteTo(sink.get());
ASSERT_EQ(locations.size(), 1);
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

const auto& location = locations.front().second;
ReaderProperties reader_properties;
::arrow::io::BufferReader reader(
::arrow::SliceBuffer(buffer, location.offset, location.length));
auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader);

EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(test_case.expected_bitset_ndv, kFpp),
filter.GetBitsetSize());
for (uint64_t hash : hashes) {
EXPECT_TRUE(filter.FindHash(hash));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that most non-inserted values are not found, with an actual FPP value below kFpp?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will let each round of testing calculate the FPP for the 10,000 numbers that have not been inserted.


int32_t false_positives = 0;
constexpr int32_t kNonInsertedCount = 10'000;
for (int32_t i = test_case.inserted_count;
i < test_case.inserted_count + kNonInsertedCount; ++i) {
false_positives += filter.FindHash(filter.Hash(i));
}
EXPECT_LT(static_cast<double>(false_positives) / kNonInsertedCount, kFpp);
}

INSTANTIATE_TEST_SUITE_P(
BloomFilterBuilder, BloomFilterBuilderFoldingTest,
::testing::Values(BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
.fold = true,
.inserted_count = 1000,
Comment thread
HuaHuaY marked this conversation as resolved.
.expected_bitset_ndv = 1000},
BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
.fold = false,
.inserted_count = 1000,
.expected_bitset_ndv = 1'000'000},
BloomFilterBuilderFoldingTestCase{.ndv = 1024,
.fold = true,
.inserted_count = 1024,
.expected_bitset_ndv = 1024},
BloomFilterBuilderFoldingTestCase{.ndv = 1024,
.fold = true,
.inserted_count = 0,
.expected_bitset_ndv = 0},
BloomFilterBuilderFoldingTestCase{.ndv = 1024,
.fold = false,
.inserted_count = 0,
.expected_bitset_ndv = 1024}));

TEST(BloomFilterBuilder, InvalidOperations) {
SchemaDescriptor schema;
schema::NodePtr root = schema::GroupNode::Make(
Expand All @@ -158,7 +252,7 @@ TEST(BloomFilterBuilder, InvalidOperations) {
schema.Init(root);

WriterProperties::Builder properties_builder;
BloomFilterOptions bloom_filter_options{100, 0.05};
BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
properties_builder.enable_bloom_filter("c1", bloom_filter_options);
properties_builder.enable_bloom_filter("c2", bloom_filter_options);
auto properties = properties_builder.build();
Expand Down
35 changes: 26 additions & 9 deletions cpp/src/parquet/bloom_filter_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,16 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
const WriterProperties* properties_;
bool finished_ = false;

using RowGroupBloomFilters =
std::map</*column_id=*/int32_t, std::shared_ptr<BloomFilter>>;
struct RowGroupBloomFilters {
struct BloomFilterEntry {
std::shared_ptr<BlockSplitBloomFilter> filter;
double target_fpp;
bool try_fold;
};

std::map</*column_id=*/int32_t, BloomFilterEntry> entries;
};

std::vector<RowGroupBloomFilters> bloom_filters_; // indexed by row group ordinal
};

Expand All @@ -206,17 +214,23 @@ BloomFilter* BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {

CheckState(column_ordinal);

auto& curr_rg_bfs = *bloom_filters_.rbegin();
auto& curr_rg_bfs = bloom_filters_.back().entries;
if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) {
std::stringstream ss;
ss << "Bloom filter already exists for column: " << column_ordinal
<< ", row group: " << (bloom_filters_.size() - 1);
throw ParquetException(ss.str());
}

auto bf = std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv, opts->fpp));
return curr_rg_bfs.emplace(column_ordinal, std::move(bf)).first->second.get();
ARROW_DCHECK(opts->ndv.has_value());
auto bf = std::make_shared<BlockSplitBloomFilter>(properties_->memory_pool());
bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(), opts->fpp));
return curr_rg_bfs
.emplace(
column_ordinal,
RowGroupBloomFilters::BloomFilterEntry{
.filter = std::move(bf), .target_fpp = opts->fpp, .try_fold = opts->fold})
.first->second.filter.get();
}

IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) {
Expand All @@ -228,11 +242,14 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
IndexLocations locations;

for (size_t i = 0; i != bloom_filters_.size(); ++i) {
auto& row_group_bloom_filters = bloom_filters_[i];
for (const auto& [column_id, filter] : row_group_bloom_filters) {
auto& row_group_bloom_filters = bloom_filters_[i].entries;
for (auto& [column_id, entry] : row_group_bloom_filters) {
// TODO(GH-43138): Determine the quality of bloom filter before writing it.
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
filter->WriteTo(sink);
if (entry.try_fold) {
entry.filter->FoldToTargetFpp(entry.target_fpp);
}
entry.filter->WriteTo(sink);
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());

if (pos - offset > std::numeric_limits<int32_t>::max()) {
Expand Down
Loading
Loading