-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-50007: [C++][Parquet] Add bloom filter folding to automatically size SBBF filters #50008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,9 +91,9 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { | |
| "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")}); | ||
| schema.Init(root); | ||
|
|
||
| BloomFilterOptions bloom_filter_options{100, 0.05}; | ||
| BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05}; | ||
| const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( | ||
| bloom_filter_options.ndv, bloom_filter_options.fpp); | ||
| bloom_filter_options.ndv.value(), bloom_filter_options.fpp); | ||
| WriterProperties::Builder properties_builder; | ||
| properties_builder.enable_bloom_filter("c1", bloom_filter_options); | ||
| auto writer_properties = properties_builder.build(); | ||
|
|
@@ -150,6 +150,100 @@ TEST(BloomFilterBuilder, BasicRoundTrip) { | |
| } | ||
| } | ||
|
|
||
| namespace { | ||
|
|
||
| struct BloomFilterBuilderFoldingTestCase { | ||
| int64_t ndv; | ||
| bool fold; | ||
| int32_t inserted_count; | ||
| int64_t expected_bitset_ndv; | ||
| }; | ||
|
|
||
| class BloomFilterBuilderFoldingTest | ||
| : public ::testing::TestWithParam<BloomFilterBuilderFoldingTestCase> {}; | ||
|
|
||
| } // namespace | ||
|
|
||
| TEST_P(BloomFilterBuilderFoldingTest, RespectsOption) { | ||
| const auto& test_case = GetParam(); | ||
|
|
||
| SchemaDescriptor schema; | ||
| schema::NodePtr root = | ||
| schema::GroupNode::Make("schema", Repetition::REPEATED, {schema::ByteArray("c1")}); | ||
| schema.Init(root); | ||
|
|
||
| constexpr double kFpp = 0.05; | ||
| BloomFilterOptions bloom_filter_options{ | ||
| .ndv = test_case.ndv, .fpp = kFpp, .fold = test_case.fold}; | ||
| const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes( | ||
| bloom_filter_options.ndv.value(), bloom_filter_options.fpp); | ||
| WriterProperties::Builder properties_builder; | ||
| properties_builder.enable_bloom_filter("c1", bloom_filter_options); | ||
| auto writer_properties = properties_builder.build(); | ||
| auto bloom_filter_builder = BloomFilterBuilder::Make(&schema, writer_properties.get()); | ||
|
|
||
| bloom_filter_builder->AppendRowGroup(); | ||
| auto bloom_filter = bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0); | ||
| ASSERT_NE(bloom_filter, nullptr); | ||
| ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize()); | ||
|
|
||
| std::vector<uint64_t> hashes; | ||
| hashes.reserve(test_case.inserted_count); | ||
| for (int32_t i = 0; i < test_case.inserted_count; ++i) { | ||
| const auto hash = bloom_filter->Hash(i); | ||
| hashes.push_back(hash); | ||
| bloom_filter->InsertHash(hash); | ||
| } | ||
|
|
||
| auto sink = CreateOutputStream(); | ||
| auto locations = bloom_filter_builder->WriteTo(sink.get()); | ||
| ASSERT_EQ(locations.size(), 1); | ||
| ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); | ||
|
|
||
| const auto& location = locations.front().second; | ||
| ReaderProperties reader_properties; | ||
| ::arrow::io::BufferReader reader( | ||
| ::arrow::SliceBuffer(buffer, location.offset, location.length)); | ||
| auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader); | ||
|
|
||
| EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(test_case.expected_bitset_ndv, kFpp), | ||
| filter.GetBitsetSize()); | ||
| for (uint64_t hash : hashes) { | ||
| EXPECT_TRUE(filter.FindHash(hash)); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check that most non-inserted values are not found, with an actual FPP value below
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will let each round of testing calculate the FPP for the 10,000 numbers that have not been inserted. |
||
|
|
||
| int32_t false_positives = 0; | ||
| constexpr int32_t kNonInsertedCount = 10'000; | ||
| for (int32_t i = test_case.inserted_count; | ||
| i < test_case.inserted_count + kNonInsertedCount; ++i) { | ||
| false_positives += filter.FindHash(filter.Hash(i)); | ||
| } | ||
| EXPECT_LT(static_cast<double>(false_positives) / kNonInsertedCount, kFpp); | ||
| } | ||
|
|
||
| INSTANTIATE_TEST_SUITE_P( | ||
| BloomFilterBuilder, BloomFilterBuilderFoldingTest, | ||
| ::testing::Values(BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000, | ||
| .fold = true, | ||
| .inserted_count = 1000, | ||
|
HuaHuaY marked this conversation as resolved.
|
||
| .expected_bitset_ndv = 1000}, | ||
| BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000, | ||
| .fold = false, | ||
| .inserted_count = 1000, | ||
| .expected_bitset_ndv = 1'000'000}, | ||
| BloomFilterBuilderFoldingTestCase{.ndv = 1024, | ||
| .fold = true, | ||
| .inserted_count = 1024, | ||
| .expected_bitset_ndv = 1024}, | ||
| BloomFilterBuilderFoldingTestCase{.ndv = 1024, | ||
| .fold = true, | ||
| .inserted_count = 0, | ||
| .expected_bitset_ndv = 0}, | ||
| BloomFilterBuilderFoldingTestCase{.ndv = 1024, | ||
| .fold = false, | ||
| .inserted_count = 0, | ||
| .expected_bitset_ndv = 1024})); | ||
|
|
||
| TEST(BloomFilterBuilder, InvalidOperations) { | ||
| SchemaDescriptor schema; | ||
| schema::NodePtr root = schema::GroupNode::Make( | ||
|
|
@@ -158,7 +252,7 @@ TEST(BloomFilterBuilder, InvalidOperations) { | |
| schema.Init(root); | ||
|
|
||
| WriterProperties::Builder properties_builder; | ||
| BloomFilterOptions bloom_filter_options{100, 0.05}; | ||
| BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05}; | ||
| properties_builder.enable_bloom_filter("c1", bloom_filter_options); | ||
| properties_builder.enable_bloom_filter("c2", bloom_filter_options); | ||
| auto properties = properties_builder.build(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this algorithm the actual size reduction will always be a power of 2 (
group_size = UINT32_C(1) << num_folds). Why aren't we trying to be more granular?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockSplitBloomFilter::Initwill check(num_bytes & (num_bytes - 1)) != 0. I didn't find this limitation in the Parquet documentation. But If we break the rule, old parquet reader will not be able to read the bloom filter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. We probably don't want to produce data that would be incompatible with old readers.
Does the power-of-two constraint serve a purpose? Perhaps we can remove it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case, can you add a comment somewhere mentioning this restriction?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have add a comment in front of
group_size.After more thinking, I think the actual size reduction must be a power of 2. Because the block index is calculated by
static_cast<uint32_t>(((hash >> 32) * NumBlocks()) >> 32);, which is required by parquet's document. And we must ensure that the calculated block index is the same before and after the fold.