Skip to content

feat(compaction): single reserve_fragment_ids after rewriting files#6072

Merged
westonpace merged 8 commits intolance-format:mainfrom
hamersaw:feature/post-allocate-compaction
Mar 4, 2026
Merged

feat(compaction): single reserve_fragment_ids after rewriting files#6072
westonpace merged 8 commits intolance-format:mainfrom
hamersaw:feature/post-allocate-compaction

Conversation

@hamersaw
Copy link
Contributor

@hamersaw hamersaw commented Mar 2, 2026

This PR fixes the issue with concurrent fragment ID reservations during parallelized compactions by moving both the fragment reservation operations and the row ID remapping out of each individual task compaction and into the main thread. This ensures that fragment reservation occurs exactly once during compaction.

This is an alternative approach to pre-computing fragment IDs based on fragment count estimates. Discussion comparing the approaches should occur on the issue to maintain a central thread.

Closes #6075

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
@github-actions github-actions bot added enhancement New feature or request java labels Mar 2, 2026
@hamersaw hamersaw changed the title feat(compaction): single reserve_fragment_ids after compaction feat(compaction): single reserve_fragment_ids after rewriting files Mar 2, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Mar 2, 2026

Review

Clean optimization that batches fragment ID reservations — moving reserve_fragment_ids from per-task in rewrite_files to a single call in commit_compaction. This reduces version bumps during compaction (confirmed by the test change from version 5→3). The RewriteResult simplification (merging row_id_map + changed_row_addrs into a single row_addrs field) is a nice cleanup.

No P0 or P1 issues found.

Minor observations (non-blocking):

  1. Improved robustness for empty tasks: The old needs_remapping path did task.row_id_map.unwrap() which would panic on the empty-fragment early-return path (where row_id_map was None). The new if let Some(...) pattern handles this gracefully. Same improvement applies to the defer_index_remap path with unwrap_or_else. Nice side-effect of the refactor.

  2. Java breaking change: RewriteResult.java removes getRowIdMap() and changes the constructor signature. Since RewriteResult is an internal serialization object passed between workers, this should be fine — but worth noting for any downstream Java consumers.

@codecov
Copy link

codecov bot commented Mar 2, 2026

Codecov Report

❌ Patch coverage is 92.30769% with 6 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance/src/dataset/optimize.rs 92.30% 6 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

@LuQQiu LuQQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly worried about some external usage leveraging this RewriteResult deleted fields directly in their workload, and does this count as BREAKING CHANGES?

/// Only set when index remap is deferred after compaction
pub changed_row_addrs: Option<Vec<u8>>,
/// Serialized RoaringTreemap of row addresses read from the original fragments.
/// Some for address-style row IDs, None for stable row IDs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some for address-style row IDs, None for stable row IDs.?
a little confusing for stable row id ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment to be a little more clear. Let me know what you think!

Comment on lines +1314 to +1323
if let Some(row_addrs_bytes) = task.row_addrs {
let row_addrs =
RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?;
let transposed = remapping::transpose_row_addrs(
row_addrs,
&task.original_fragments,
&task.new_fragments,
);
row_id_map.extend(transposed);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your effort here. As we discuss compared to pre-allocation #6004, the main difference here is that row_id_map has been changed from parallel computing to serial computing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the transpose a significant amount of work compared to the I/O cost of the rewrites themselves? If so I wonder if we can optimize the transposition process somehow.

Copy link
Contributor Author

@hamersaw hamersaw Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested compaction with 100k fragments where each has 16 rows of <int32, int32>. So I assume this is close to worst case scenario of the highest row count (transposition cost) for I/O cost - actually I guess a couple large fragments may be worse. I compacted down to 144 fragments with 24 parallelism:
pre-allocate: 21m49s
post-allocate: 22m15s

So like 26s more for serial transposition, a little over 1% slower. IIUC it's an entirely in-memory operation, and even if done in parallel we need to aggregate these together in the main thread before writing. I was more so worried about memory utilization on 1b+ rows, but RAM should be the same between them.

hamersaw added 6 commits March 2, 2026 21:23
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some more thoughts on #6004 as well

Comment on lines +870 to +879
/// Serialized `RoaringTreemap` of the original row addresses read from the
/// source fragments during compaction.
///
/// `Some` for datasets using address-style row IDs. During commit, these
/// addresses are transposed to the new fragment layout to build an old-to-new
/// mapping for index remapping. Also triggers fragment ID reservation since
/// address-style fragments don't have pre-allocated IDs.
///
/// `None` for datasets using stable row IDs, which handle remapping via
/// row ID sequences instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does a single bitmap express a mapping? Can we expand on this comment a bit to explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is my third iteration of the comment already - it's a tough one. This is just the sequence of origin row IDs. The option is because they are only useful if we're not using stable row IDs, I tried to address Lu's previous comment by explaining how they are used a bit. Basically, if stable row ids - we don't need them, if defer_index_remap - we flush, otherwise we use them (along with post-allocated fragment IDs) for the remapping.

I'll take another stab at making this more clear!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see and I remember the problem now. We calculate the target ids on the fly once we get the fragment ids because it's easy (one contiguous sequence per fragment based on id). The source ids are trickier because we don't know how many holes there were in the source fragments due to deletions. So these need to be written down (although as I'm explaining this part of me wonders if we could just use the deletion maps on the source fragments instead but let's not make this more complicated than it needs to be 😆)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sure, so we could potentially not even need this field if we wanted to get fancy. I convinced myself that this is replacing the existing two fields and will be strictly smaller (bytes-wise), so it shouldn't be an issue. But if we think it's worth diving into further optimizations, I could take a stab at it.

row_id_map.extend(transposed);
}
} else if options.defer_index_remap {
let changed_row_addrs = task.row_addrs.unwrap_or_else(|| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be an error if row_addrs is not set in this branch? Or does that just mean stable row ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will silently produce errors in remapping. I've updated to return an error. Basically, we can get this branch if defer_index_remap and use_stable_row_ids are configured in tandem, which is an invalid configuration that IIR we catch elsewhere. Even if we cannot reach that case, we should explicitly fail here.

Comment on lines +1314 to +1323
if let Some(row_addrs_bytes) = task.row_addrs {
let row_addrs =
RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?;
let transposed = remapping::transpose_row_addrs(
row_addrs,
&task.original_fragments,
&task.new_fragments,
);
row_id_map.extend(transposed);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the transpose a significant amount of work compared to the I/O cost of the rewrites themselves? If so I wonder if we can optimize the transposition process somehow.

Copy link
Collaborator

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @hamersaw and @zhangyue19921010, for working on this. Both solutions look good, but I prefer the simpler one. Therefore, I tend to use the post solution.


if needs_remapping {
row_id_map.extend(task.row_id_map.unwrap());
if let Some(row_addrs_bytes) = task.row_addrs {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems impossible for needs_remapping to be true while task.row_addrs is none. However, if it does happen, we currently ignore it silently. Should we return an error instead, or add a comment to clarify that ignoring it is the intended behavior?

Comment on lines +1289 to +1297
let has_address_style = completed_tasks.iter().any(|t| t.row_addrs.is_some());
if has_address_style {
let frags: Vec<&mut Fragment> = completed_tasks
.iter_mut()
.filter(|t| t.row_addrs.is_some())
.flat_map(|t| t.new_fragments.iter_mut())
.collect();
reserve_fragment_ids(dataset, frags.into_iter()).await?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can avoid double loop and Vec here:

let mut frags = completed_tasks
    .iter_mut()
    .filter(|t| t.row_addrs.is_some())
    .flat_map(|t| t.new_fragments.iter_mut())
    .peekable();

let has_address_style = frags.peek().is_some();

if has_address_style {
    reserve_fragment_ids(dataset, frags).await?;
}

row_id_map.extend(task.row_id_map.unwrap());
if let Some(row_addrs_bytes) = task.row_addrs {
let row_addrs =
RoaringTreemap::deserialize_from(&mut Cursor::new(&row_addrs_bytes))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can store the row_addrs_bytes in Bytes so that we don't need the Cursor help.

@zhangyue19921010
Copy link
Contributor

This Post allocation scheme is indeed simpler and more reasonable, +1 For this :)
In addition, regarding the performance issue of row_id_map in large data volume scenarios, on the one hand, we can try parallel processing if needed; on the other hand, we can also split a huge Compaction into multiple relatively medium-sized Compactions and execute them step by step, which is mentioned here #6039

@westonpace westonpace merged commit c60913a into lance-format:main Mar 4, 2026
36 checks passed
@hamersaw hamersaw deleted the feature/post-allocate-compaction branch March 4, 2026 21:14
wjones127 pushed a commit to wjones127/lance that referenced this pull request Mar 4, 2026
…ance-format#6072)

This PR fixes the issue with concurrent fragment ID reservations during
parallelized compactions by moving both the fragment reservation
operations and the row ID remapping out of each individual task
compaction and into the main thread. This ensures that fragment
reservation occurs exactly once during compaction.

This is an alternative approach to pre-computing fragment IDs based on
fragment count estimates. Discussion comparing the approaches should
occur on the issue to maintain a central thread.

Closes lance-format#6075

---------

Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reduce thundering herd of reserve fragment ID calls during parallelized compaction

5 participants