feat(compaction): single reserve_fragment_ids after rewriting files#6072
Conversation
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
ReviewClean optimization that batches fragment ID reservations — moving No P0 or P1 issues found. Minor observations (non-blocking):
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
LuQQiu
left a comment
There was a problem hiding this comment.
Mainly worried about some external usage leveraging this RewriteResult deleted fields directly in their workload, and does this count as BREAKING CHANGES?
rust/lance/src/dataset/optimize.rs
Outdated
| /// Only set when index remap is deferred after compaction | ||
| pub changed_row_addrs: Option<Vec<u8>>, | ||
| /// Serialized RoaringTreemap of row addresses read from the original fragments. | ||
| /// Some for address-style row IDs, None for stable row IDs. |
There was a problem hiding this comment.
Some for address-style row IDs, None for stable row IDs.?
a little confusing for stable row id ones
There was a problem hiding this comment.
Updated the comment to be a little more clear. Let me know what you think!
| if let Some(row_addrs_bytes) = task.row_addrs { | ||
| let row_addrs = | ||
| RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?; | ||
| let transposed = remapping::transpose_row_addrs( | ||
| row_addrs, | ||
| &task.original_fragments, | ||
| &task.new_fragments, | ||
| ); | ||
| row_id_map.extend(transposed); | ||
| } |
There was a problem hiding this comment.
Thanks for your effort here. As we discuss compared to pre-allocation #6004, the main difference here is that row_id_map has been changed from parallel computing to serial computing.
There was a problem hiding this comment.
Is the transpose a significant amount of work compared to the I/O cost of the rewrites themselves? If so I wonder if we can optimize the transposition process somehow.
There was a problem hiding this comment.
I tested compaction with 100k fragments where each has 16 rows of <int32, int32>. So I assume this is close to worst case scenario of the highest row count (transposition cost) for I/O cost - actually I guess a couple large fragments may be worse. I compacted down to 144 fragments with 24 parallelism:
pre-allocate: 21m49s
post-allocate: 22m15s
So like 26s more for serial transposition, a little over 1% slower. IIUC it's an entirely in-memory operation, and even if done in parallel we need to aggregate these together in the main thread before writing. I was more so worried about memory utilization on 1b+ rows, but RAM should be the same between them.
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
This reverts commit 2bdb40d.
westonpace
left a comment
There was a problem hiding this comment.
I added some more thoughts on #6004 as well
rust/lance/src/dataset/optimize.rs
Outdated
| /// Serialized `RoaringTreemap` of the original row addresses read from the | ||
| /// source fragments during compaction. | ||
| /// | ||
| /// `Some` for datasets using address-style row IDs. During commit, these | ||
| /// addresses are transposed to the new fragment layout to build an old-to-new | ||
| /// mapping for index remapping. Also triggers fragment ID reservation since | ||
| /// address-style fragments don't have pre-allocated IDs. | ||
| /// | ||
| /// `None` for datasets using stable row IDs, which handle remapping via | ||
| /// row ID sequences instead. |
There was a problem hiding this comment.
How does a single bitmap express a mapping? Can we expand on this comment a bit to explain?
There was a problem hiding this comment.
Yeah, this is my third iteration of the comment already - it's a tough one. This is just the sequence of origin row IDs. The option is because they are only useful if we're not using stable row IDs, I tried to address Lu's previous comment by explaining how they are used a bit. Basically, if stable row ids - we don't need them, if defer_index_remap - we flush, otherwise we use them (along with post-allocated fragment IDs) for the remapping.
I'll take another stab at making this more clear!
There was a problem hiding this comment.
I think I see and I remember the problem now. We calculate the target ids on the fly once we get the fragment ids because it's easy (one contiguous sequence per fragment based on id). The source ids are trickier because we don't know how many holes there were in the source fragments due to deletions. So these need to be written down (although as I'm explaining this part of me wonders if we could just use the deletion maps on the source fragments instead but let's not make this more complicated than it needs to be 😆)
There was a problem hiding this comment.
Oh sure, so we could potentially not even need this field if we wanted to get fancy. I convinced myself that this is replacing the existing two fields and will be strictly smaller (bytes-wise), so it shouldn't be an issue. But if we think it's worth diving into further optimizations, I could take a stab at it.
rust/lance/src/dataset/optimize.rs
Outdated
| row_id_map.extend(transposed); | ||
| } | ||
| } else if options.defer_index_remap { | ||
| let changed_row_addrs = task.row_addrs.unwrap_or_else(|| { |
There was a problem hiding this comment.
Should it be an error if row_addrs is not set in this branch? Or does that just mean stable row ids?
There was a problem hiding this comment.
Yes, this will silently produce errors in remapping. I've updated to return an error. Basically, we can get this branch if defer_index_remap and use_stable_row_ids are configured in tandem, which is an invalid configuration that IIR we catch elsewhere. Even if we cannot reach that case, we should explicitly fail here.
| if let Some(row_addrs_bytes) = task.row_addrs { | ||
| let row_addrs = | ||
| RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?; | ||
| let transposed = remapping::transpose_row_addrs( | ||
| row_addrs, | ||
| &task.original_fragments, | ||
| &task.new_fragments, | ||
| ); | ||
| row_id_map.extend(transposed); | ||
| } |
There was a problem hiding this comment.
Is the transpose a significant amount of work compared to the I/O cost of the rewrites themselves? If so I wonder if we can optimize the transposition process somehow.
Xuanwo
left a comment
There was a problem hiding this comment.
Thank you, @hamersaw and @zhangyue19921010, for working on this. Both solutions look good, but I prefer the simpler one. Therefore, I tend to use the post solution.
|
|
||
| if needs_remapping { | ||
| row_id_map.extend(task.row_id_map.unwrap()); | ||
| if let Some(row_addrs_bytes) = task.row_addrs { |
There was a problem hiding this comment.
It seems impossible for needs_remapping to be true while task.row_addrs is none. However, if it does happen, we currently ignore it silently. Should we return an error instead, or add a comment to clarify that ignoring it is the intended behavior?
| let has_address_style = completed_tasks.iter().any(|t| t.row_addrs.is_some()); | ||
| if has_address_style { | ||
| let frags: Vec<&mut Fragment> = completed_tasks | ||
| .iter_mut() | ||
| .filter(|t| t.row_addrs.is_some()) | ||
| .flat_map(|t| t.new_fragments.iter_mut()) | ||
| .collect(); | ||
| reserve_fragment_ids(dataset, frags.into_iter()).await?; | ||
| } |
There was a problem hiding this comment.
Nit: we can avoid double loop and Vec here:
let mut frags = completed_tasks
.iter_mut()
.filter(|t| t.row_addrs.is_some())
.flat_map(|t| t.new_fragments.iter_mut())
.peekable();
let has_address_style = frags.peek().is_some();
if has_address_style {
reserve_fragment_ids(dataset, frags).await?;
}| row_id_map.extend(task.row_id_map.unwrap()); | ||
| if let Some(row_addrs_bytes) = task.row_addrs { | ||
| let row_addrs = | ||
| RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?; |
There was a problem hiding this comment.
Maybe we can store the row_addrs_bytes in Bytes so that we don't need the Cursor help.
|
This Post allocation scheme is indeed simpler and more reasonable, +1 For this :) |
…ance-format#6072) This PR fixes the issue with concurrent fragment ID reservations during parallelized compactions by moving both the fragment reservation operations and the row ID remapping out of each individual task compaction and into the main thread. This ensures that fragment reservation occurs exactly once during compaction. This is an alternative approach to pre-computing fragment IDs based on fragment count estimates. Discussion comparing the approaches should occur on the issue to maintain a central thread. Closes lance-format#6075 --------- Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
This PR fixes the issue with concurrent fragment ID reservations during parallelized compactions by moving both the fragment reservation operations and the row ID remapping out of each individual task compaction and into the main thread. This ensures that fragment reservation occurs exactly once during compaction.
This is an alternative approach to pre-computing fragment IDs based on fragment count estimates. Discussion comparing the approaches should occur on the issue to maintain a central thread.
Closes #6075