-
Notifications
You must be signed in to change notification settings - Fork 477
Compaction deletes non-shared files #6060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,9 @@ | |
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; | ||
| import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
@@ -67,6 +69,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) { | |
| this.newDatafile = newDatafile; | ||
| } | ||
|
|
||
| private record CompactionFileResult(TabletMetadata tabletMetadata, | ||
| ArrayList<StoredTabletFile> filesToDeleteViaGc) { | ||
| } | ||
|
|
||
| @Override | ||
| public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception { | ||
| var ecid = ExternalCompactionId.of(commitData.ecid); | ||
|
|
@@ -79,25 +85,25 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception { | |
| // process died and now its running again. In this case commit should do nothing, but its | ||
| // important to still carry on with the rest of the steps after commit. This code ignores a that | ||
| // fact that a commit may not have happened in the current call and continues for this reason. | ||
| TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile); | ||
| CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile); | ||
|
|
||
| String loc = null; | ||
| if (tabletMetadata != null && tabletMetadata.getLocation() != null) { | ||
| loc = tabletMetadata.getLocation().getHostPortSession(); | ||
| if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) { | ||
| loc = fileResult.tabletMetadata.getLocation().getHostPortSession(); | ||
| } | ||
|
|
||
| // This will causes the tablet to be reexamined to see if it needs any more compactions. | ||
| var extent = KeyExtent.fromThrift(commitData.textent); | ||
| env.getEventPublisher().event(extent, "Compaction completed %s", extent); | ||
|
|
||
| return new PutGcCandidates(commitData, loc); | ||
| return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc()); | ||
| } | ||
|
|
||
| KeyExtent getExtent() { | ||
| return KeyExtent.fromThrift(commitData.textent); | ||
| } | ||
|
|
||
| private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, | ||
| private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid, | ||
| Optional<ReferencedTabletFile> newDatafile) { | ||
|
|
||
| var tablet = | ||
|
|
@@ -107,6 +113,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) | ||
| .logInterval(Duration.ofMinutes(3)).createRetry(); | ||
|
|
||
| ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>(); | ||
|
|
||
| while (canCommitCompaction(ecid, tablet)) { | ||
| CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); | ||
|
|
||
|
|
@@ -126,7 +134,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| } | ||
|
|
||
| // make the needed updates to the tablet | ||
| updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); | ||
| filesToDeleteViaGc = updateTabletForCompaction(ctx, commitData.stats, ecid, tablet, | ||
| newDatafile, ecm, tabletMutator); | ||
|
|
||
| tabletMutator.submit( | ||
| tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid), | ||
|
|
@@ -156,13 +165,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| } | ||
| } | ||
|
|
||
| return tablet; | ||
| return new CompactionFileResult(tablet, filesToDeleteViaGc); | ||
| } | ||
|
|
||
| private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, | ||
| TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm, | ||
| private ArrayList<StoredTabletFile> updateTabletForCompaction(ServerContext ctx, | ||
| TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet, | ||
| Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm, | ||
| Ample.ConditionalTabletMutator tabletMutator) { | ||
|
|
||
| ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>(); | ||
|
|
||
| if (ecm.getKind() == CompactionKind.USER) { | ||
| if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { | ||
| // all files selected for the user compactions are finished, so the tablet is finish and | ||
|
|
@@ -211,13 +223,51 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio | |
| // scan | ||
| ecm.getJobFiles().forEach(tabletMutator::putScan); | ||
| } | ||
| ecm.getJobFiles().forEach(tabletMutator::deleteFile); | ||
| for (StoredTabletFile file : ecm.getJobFiles()) { | ||
| // Check if file is shared | ||
| DataFileValue fileMetadata = tablet.getFilesMap().get(file); | ||
| boolean isShared = fileMetadata != null && fileMetadata.isShared(); | ||
|
|
||
| if (isShared) { | ||
| // File is shared - must use GC delete markers | ||
| LOG.debug("File {} is shared, will create GC delete marker", file.getFileName()); | ||
| filesToDeleteViaGc.add(file); | ||
| } else { | ||
| // File is not shared - try to delete directly | ||
| try { | ||
| Path filePath = new Path(file.getMetadataPath()); | ||
| boolean deleted = ctx.getVolumeManager().deleteRecursively(filePath); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleting files here is not correct because the conditional mutation to the metadata table has not yet been made. Only after successfully making the conditional mutation will we know what was shared or not. Need to do something like the following.
There is existing code for comparing the files values, but its not exactly what we need. Would probably need to create a new |
||
|
|
||
| if (deleted) { | ||
| LOG.debug("Successfully deleted non-shared compaction input file: {}", | ||
| file.getFileName()); | ||
| } else { | ||
| LOG.warn("Failed to delete non-shared file {}, will create GC delete marker", | ||
| file.getFileName()); | ||
| filesToDeleteViaGc.add(file); | ||
| } | ||
| } catch (IOException e) { | ||
| LOG.warn("Error deleting non-shared file {}, will create GC delete marker: {}", | ||
| file.getFileName(), e.getMessage()); | ||
| filesToDeleteViaGc.add(file); | ||
| } | ||
| } | ||
|
|
||
| // Always delete from tablet metadata | ||
| tabletMutator.deleteFile(file); | ||
| } | ||
|
|
||
| tabletMutator.deleteExternalCompaction(ecid); | ||
|
|
||
| if (newDatafile.isPresent()) { | ||
| tabletMutator.putFile(newDatafile.orElseThrow(), | ||
| new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); | ||
| // NEW: Mark new compaction output files as not shared | ||
| DataFileValue newFileValue = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), | ||
| System.currentTimeMillis(), false // New file created by this tablet alone - not shared | ||
| ); | ||
| tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue); | ||
| } | ||
|
|
||
| return filesToDeleteViaGc; | ||
| } | ||
|
|
||
| public static boolean canCommitCompaction(ExternalCompactionId ecid, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking the source tablets as shared here has race conditions, these may not be the files in the clone. Also marking them as shared must be done using a conditional writer that only changes the marking if the file exists in the tablet w/ the same value (this check must be done by the conditional mutation).
The following conditions must be true for clone to avoid race conditions.
Need to rework the code to do the following algorithm for each tablet.
The current code does this w/o step 1. Also the current code batches tablet read/writes for efficiency. The code would probably be much easier to understand w/o the batching, but doing one tablet at a time w/o batching would be really slow. So need to modify the code work in step 1 and still do the batching.