feat(compaction): preallocate fragment IDs to reduce reserve-commit amplification and conflicts#6004
Conversation
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
There was a problem hiding this comment.
I think it would be useful to explore post-allocation of fragment IDs as an alternative to this pre-allocation solution:
Pre-allocation: We estimate the number of resulting fragments and do a single call to allocate fragment IDs. Since the fragment IDs are known during compaction, we can distribute transpose row_id operations (if enabled).
- Pros: If we estimate correctly, this is great!
- Cons: Incorrect estimates lead to overhead in (1) under-estimation causes additional
reserve_fragment_idsinvocations per task (so same as current approach) or (2) over-estimation results in unused fragment IDs (probably not a large concern, but merits calling out).
Post-allocation: We wait until all fragments have been written and then allocate fragment IDs. This will require a post-processing step for transposing row_id.
- Pros: We do not have to estimate fragment counts, because these are known. So in every case a single
reserve-fragment-idscall. - Cons: We can not distribute transposing
row_idoperations; for large datasets (e.g., 1b+ rows) this can be order of seconds I think).
IMO the call on whether to pre-allocate or post-allocation is a tradeoff between (1) our confidence level in the estimation accuracy and (2) the cost of transposing row_ids.
| estimated_output_fragment_count: None, | ||
| reserved_start: None, | ||
| reserved_len: None, |
There was a problem hiding this comment.
This is just to make compilation succeed? Should we expose this in python / Java SDKs as first-class? Otherwise we can take it on as a follow up!
| /// default false | ||
| pub prealloc_fragment_ids: bool, | ||
| /// Expansion factor applied to preallocated fragment IDs for a compaction plan. | ||
| /// default 1.05 |
There was a problem hiding this comment.
Not sure if the prealloc_fragment_factor is useful. Basically, the algorithm assigns the estimated fragment count to each TaskData and then the last one takes the rest. So in practice, if there are 100 TaskData instances each with 2 fragments and the factor is 1.05 (210 reserved), 99 TaskData will receive 2 assigned, and the last TaskData will get the remaining 12 reserved fragment IDs. So I would challenge that this configuration gives us the ability to tune unused over allocation and addition reservation overhead.
There was a problem hiding this comment.
Nice Catch! In fact, we can apply this magnification factor to the granularity of Task allocation. Taking your example, that would be 2 * 1.05 => 3, and finally pre-allocation for fragment ids according to the total number of finally allocated fragments.
| }) | ||
| } | ||
|
|
||
| fn sum_file_size(fragment: &Fragment) -> usize { |
There was a problem hiding this comment.
I think this is only used once, does it make sense to define a separate function for it?
|
Would also like to comment here praising potential perf improvements of this work for large compactions. I ran a test on compacting a 100k fragment dataset (16 rows per fragment) down to 144 fragments with 24 compactions in parallel. Without this it took 1h53m and with pre-allocating fragment IDs it took 21m45s. So very significant! |
|
Hi @hamersaw Thanks a lot for your attention for this issue.
Maybe hot path is related to follow code which need to access every rows and fragments compaction involved.
From the perspective of Pre-allocation, the compatibility is better, but it also indeed cannot guarantee the continuity of fragment IDs. I have had a brief discussion with @jackye1995 about this :) Hi @wjones127, @jackye1995 and @Xuanwo Could you please share some thoughts? Thank you ! |
westonpace
left a comment
There was a problem hiding this comment.
Here's my two cents:
- I am not concerned about the cost of the transpose. I do not think this should be significant when compared with all the other work that needs to be done. If needed we can parallelize across threads but even then I wouldn't bother.
- I am not worried about a few lost fragment ids from an over-estimate. We can have fragment gaps already due to failed compaction and I know we encounter this often in production.
- I am not worried about breaking changes to
RewriteResult. These are fairly temporary and this would only affect a compaction that starts on one version and commits on another which seems rare and unlikely. - In general, I think our estimates will probably be pretty good
- I am slightly worried about out-of-order fragment ids due to an under-estimate. I just don't know if we happen to make any implicit assumptions that fragment ids are always ascending and bugs from this can be subtle.
- I do think the post-allocation is slightly less complex.
Given this, I lean slightly towards post-allocation.
| let live_rows = bin.row_counts.iter().copied().sum::<usize>(); | ||
| let total_input_file_size = | ||
| bin.fragments.iter().map(sum_file_size).sum::<usize>(); | ||
| let estimated_output_fragment_count = estimate_output_fragment_num( |
There was a problem hiding this comment.
Why is this an estimate? Is it because we might hit the size (bytes) limit when writing and end up with more fragments than expected?
| pub binary_copy_read_batch_bytes: Option<usize>, | ||
| /// Whether to preallocate fragment IDs for a compaction plan. | ||
| /// default false | ||
| pub prealloc_fragment_ids: bool, |
There was a problem hiding this comment.
Why would I ever want this set to false?
Xuanwo
left a comment
There was a problem hiding this comment.
Thank you @zhangyue19921010 for your work on this! I believe this PR actually combines two aspects: fragment ID allocation and the distribution of remap tasks.
My current idea is that we could first add post reserve. Then, we can adapt this PR to implement the remap task distribution based on that approach.
What are your thoughts? @zhangyue19921010 @hamersaw
|
First of all, thanks for all your discuss and attention. After careful consideration, post-processing is indeed a more concise solution. I will close this PR. Thanks for all your effort here @hamersaw @westonpace @Xuanwo 👍 |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
|
Thank you @zhangyue19921010 again! |
Background
At the moment, compaction calls
reserve_fragment_idsin each task, and each call is effectively a commit (ReserveFragmentstransaction).With large datasets (for example, compaction plans with up to 10K tasks), especially under concurrent compaction, this creates a large number of tiny commits.
As a result, we repeatedly trigger transaction conflict checks, which can significantly hurt performance and may even cause task starvation (some tasks take a very long time to finish).
What this PR introduces
This PR adds
prealloc_fragment_idsfor compaction.When enabled:
This reduces commit amplification from per-task reservation to mostly plan-level reservation.
Compatibility and safety guarantees
prealloc_fragment_ids) controls this behavior.fragment_id_prealloc_factor) allows over-reservation to provide a safety margin.reserve_fragment_idscall to continue safely.Expected impact
For example, the following is an overview of the number of commits for compacting 1000 fragments, with a total of 100 commits.
After optimization, only two times are needed (Pre allocate fragments ids commit + Compaction commit)