diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java index 1402cb83118..45dc75d4611 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java @@ -28,17 +28,34 @@ public class DataFileValue { private final long size; private final long numEntries; private long time = -1; + private boolean shared; + + public DataFileValue(long size, long numEntries, long time, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = time; + this.shared = shared; + } + + public DataFileValue(long size, long numEntries, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = -1; + this.shared = shared; + } public DataFileValue(long size, long numEntries, long time) { this.size = size; this.numEntries = numEntries; this.time = time; + this.shared = false; } public DataFileValue(long size, long numEntries) { this.size = size; this.numEntries = numEntries; this.time = -1; + this.shared = false; } public DataFileValue(String encodedDFV) { @@ -46,11 +63,19 @@ public DataFileValue(String encodedDFV) { size = Long.parseLong(ba[0]); numEntries = Long.parseLong(ba[1]); + time = -1; + shared = false; if (ba.length == 3) { - time = Long.parseLong(ba[2]); - } else { - time = -1; + // Could be old format with time, or new format with shared + try { + time = Long.parseLong(ba[2]); + } catch (NumberFormatException e) { + shared = Boolean.parseBoolean(ba[2]); + } + } else if (ba.length == 4) { + shared = Boolean.parseBoolean(ba[2]); + time = Long.parseLong(ba[3]); } } @@ -74,15 +99,19 @@ public long getTime() { return time; } + public boolean isShared() { + return shared; + } + public byte[] encode() { return encodeAsString().getBytes(UTF_8); } public String encodeAsString() { if (time >= 0) { - return ("" + size + "," + numEntries + "," + time); + return ("" + size + "," + numEntries + "," + shared + "," + time); } - return ("" + size + "," + numEntries); + return ("" + size + "," + numEntries + "," + shared); } public Value encodeAsValue() { @@ -93,7 +122,8 @@ public Value encodeAsValue() { public boolean equals(Object o) { if (o instanceof DataFileValue odfv) { - return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time; + return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time + && shared == odfv.shared; } return false; @@ -101,12 +131,12 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Long.valueOf(size + numEntries).hashCode(); + return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode(); } @Override public String toString() { - return size + " " + numEntries; + return size + " " + numEntries + " " + shared; } public void setTime(long time) { @@ -116,6 +146,10 @@ public void setTime(long time) { this.time = time; } + public void setShared(boolean shared) { + this.shared = shared; + } + /** * @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise. */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index d702b3a4f83..2d59bd0c9b1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -179,6 +179,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon return new Pair<>(result, sizes); } + private static void markSourceFilesAsShared(TableId srcTableId, TableId tableId, + AccumuloClient client, BatchWriter bw) throws MutationsRejectedException { + + log.info("Marking source table {} files as shared after clone to {}", srcTableId, tableId); + + try (TabletsMetadata srcTM = createCloneScanner(null, srcTableId, client)) { + for (TabletMetadata srcTablet : srcTM) { + + if (srcTablet.getFiles().isEmpty()) { + continue; + } + + Mutation m = new Mutation(srcTablet.getExtent().toMetaRow()); + boolean hasChanges = false; + + // Update each file's DataFileValue to mark as shared + for (var entry : srcTablet.getFilesMap().entrySet()) { + StoredTabletFile file = entry.getKey(); + DataFileValue dfv = entry.getValue(); + + if (!dfv.isShared()) { + DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(), + dfv.isTimeSet() ? dfv.getTime() : -1, true); + + m.put(DataFileColumnFamily.NAME, file.getMetadataText(), sharedDfv.encodeAsValue()); + hasChanges = true; + + log.debug("Marking file {} as shared in source tablet {}", file.getFileName(), + srcTablet.getExtent()); + } + } + + if (hasChanges) { + bw.addMutation(m); + } + } + } + + bw.flush(); + log.info("Finished marking source table {} files as shared", srcTableId); + } + private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, Iterable> tablet) { @@ -191,7 +233,13 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, if (!cf.startsWith("../") && !cf.contains(":")) { cf = "../" + srcTableId + entry.getKey().getColumnQualifier(); } - m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue()); + + DataFileValue ogVal = new DataFileValue(entry.getValue().get()); + DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(), + ogVal.isTimeSet() ? ogVal.getTime() : -1, true); + + // FIXED: Use newSharedVal instead of original value + m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue()); } else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue()); } else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) { @@ -380,6 +428,8 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId } } + markSourceFilesAsShared(srcTableId, tableId, context, bw); + // delete the clone markers and create directory entries Scanner mscanner = context.createScanner(SystemTables.METADATA.tableName(), Authorizations.EMPTY); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index bf54f9b836f..4c07be71eb8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -25,7 +25,9 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -67,6 +69,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) { this.newDatafile = newDatafile; } + private record CompactionFileResult(TabletMetadata tabletMetadata, + ArrayList filesToDeleteViaGc) { + } + @Override public Repo call(FateId fateId, FateEnv env) throws Exception { var ecid = ExternalCompactionId.of(commitData.ecid); @@ -79,25 +85,25 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { // process died and now its running again. In this case commit should do nothing, but its // important to still carry on with the rest of the steps after commit. This code ignores a that // fact that a commit may not have happened in the current call and continues for this reason. - TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile); + CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile); String loc = null; - if (tabletMetadata != null && tabletMetadata.getLocation() != null) { - loc = tabletMetadata.getLocation().getHostPortSession(); + if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) { + loc = fileResult.tabletMetadata.getLocation().getHostPortSession(); } // This will causes the tablet to be reexamined to see if it needs any more compactions. var extent = KeyExtent.fromThrift(commitData.textent); env.getEventPublisher().event(extent, "Compaction completed %s", extent); - return new PutGcCandidates(commitData, loc); + return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc()); } KeyExtent getExtent() { return KeyExtent.fromThrift(commitData.textent); } - private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, + private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid, Optional newDatafile) { var tablet = @@ -107,6 +113,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) .logInterval(Duration.ofMinutes(3)).createRetry(); + ArrayList filesToDeleteViaGc = new ArrayList<>(); + while (canCommitCompaction(ecid, tablet)) { CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); @@ -126,7 +134,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId } // make the needed updates to the tablet - updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); + filesToDeleteViaGc = updateTabletForCompaction(ctx, commitData.stats, ecid, tablet, + newDatafile, ecm, tabletMutator); tabletMutator.submit( tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid), @@ -156,13 +165,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId } } - return tablet; + return new CompactionFileResult(tablet, filesToDeleteViaGc); } - private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, - TabletMetadata tablet, Optional newDatafile, CompactionMetadata ecm, + private ArrayList updateTabletForCompaction(ServerContext ctx, + TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet, + Optional newDatafile, CompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) { + ArrayList filesToDeleteViaGc = new ArrayList<>(); + if (ecm.getKind() == CompactionKind.USER) { if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { // all files selected for the user compactions are finished, so the tablet is finish and @@ -211,13 +223,51 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio // scan ecm.getJobFiles().forEach(tabletMutator::putScan); } - ecm.getJobFiles().forEach(tabletMutator::deleteFile); + for (StoredTabletFile file : ecm.getJobFiles()) { + // Check if file is shared + DataFileValue fileMetadata = tablet.getFilesMap().get(file); + boolean isShared = fileMetadata != null && fileMetadata.isShared(); + + if (isShared) { + // File is shared - must use GC delete markers + LOG.debug("File {} is shared, will create GC delete marker", file.getFileName()); + filesToDeleteViaGc.add(file); + } else { + // File is not shared - try to delete directly + try { + Path filePath = new Path(file.getMetadataPath()); + boolean deleted = ctx.getVolumeManager().deleteRecursively(filePath); + + if (deleted) { + LOG.debug("Successfully deleted non-shared compaction input file: {}", + file.getFileName()); + } else { + LOG.warn("Failed to delete non-shared file {}, will create GC delete marker", + file.getFileName()); + filesToDeleteViaGc.add(file); + } + } catch (IOException e) { + LOG.warn("Error deleting non-shared file {}, will create GC delete marker: {}", + file.getFileName(), e.getMessage()); + filesToDeleteViaGc.add(file); + } + } + + // Always delete from tablet metadata + tabletMutator.deleteFile(file); + } + tabletMutator.deleteExternalCompaction(ecid); if (newDatafile.isPresent()) { - tabletMutator.putFile(newDatafile.orElseThrow(), - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + // NEW: Mark new compaction output files as not shared + DataFileValue newFileValue = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), + System.currentTimeMillis(), false // New file created by this tablet alone - not shared + ); + tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue); } + + return filesToDeleteViaGc; } public static boolean canCommitCompaction(ExternalCompactionId ecid, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java index 3933a625048..b244356046b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java @@ -18,27 +18,52 @@ */ package org.apache.accumulo.manager.compaction.coordinator.commit; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.gc.ReferenceFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; import org.apache.accumulo.manager.tableOps.FateEnv; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PutGcCandidates extends AbstractFateOperation { private static final long serialVersionUID = 1L; private final CompactionCommitData commitData; private final String refreshLocation; + private final ArrayList filesToDeleteViaGc; + private static final Logger LOG = LoggerFactory.getLogger(PutGcCandidates.class); - public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) { + public PutGcCandidates(CompactionCommitData commitData, String refreshLocation, + ArrayList filesToDeleteViaGc) { this.commitData = commitData; this.refreshLocation = refreshLocation; + this.filesToDeleteViaGc = new ArrayList<>(); + for (StoredTabletFile file : filesToDeleteViaGc) { + this.filesToDeleteViaGc.add(file.getMetadataPath()); + } } @Override public Repo call(FateId fateId, FateEnv env) throws Exception { + if (filesToDeleteViaGc != null && !filesToDeleteViaGc.isEmpty()) { + var extent = KeyExtent.fromThrift(commitData.textent); + List deleteMarkers = new ArrayList<>(); - // add the GC candidates - env.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles()); + for (String filePath : filesToDeleteViaGc) { + StoredTabletFile file = new StoredTabletFile(filePath); + deleteMarkers.add(ReferenceFile.forFile(extent.tableId(), file)); + LOG.debug("Creating GC delete marker for file: {}", file.getFileName()); + } + + env.getContext().getAmple().putGcFileAndDirCandidates(extent.tableId(), deleteMarkers); + LOG.debug("Created {} GC delete markers for compaction", deleteMarkers.size()); + } if (refreshLocation == null) { env.recordCompactionCompletion(ExternalCompactionId.of(commitData.ecid)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 88238036e38..3f1ebf9aa45 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -167,7 +167,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws this.loadingFiles = new HashMap<>(); } - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { Map toLoad = new HashMap<>(); for (var fileInfo : files) { @@ -245,16 +245,18 @@ void load(List tablets, Files files) { ReferencedTabletFile refTabFile = entry.getKey(); Bulk.FileInfo fileInfo = entry.getValue(); + boolean isShared = sharedFiles.contains(fileInfo.getFileName()); DataFileValue dfv; if (setTime) { // This should always be set outside the loop when setTime is true and should not be // null at this point Preconditions.checkState(fileTime != null); - dfv = - new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), fileTime); + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), + fileTime, isShared); } else { - dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); + dfv = + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), isShared); } filesToLoad.put(refTabFile, dfv); @@ -402,6 +404,32 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, String fmtTid = fateId.getTxUUIDStr(); log.trace("{}: Started loading files at row: {}", fmtTid, startRow); + List> allEntries = new ArrayList<>(); + while (lmi.hasNext()) { + allEntries.add(lmi.next()); + } + + if (allEntries.isEmpty()) { + log.warn("{}: No files to load", fateId.getTxUUIDStr()); + return 0; + } + + Map fileTabletCount = new HashMap<>(); + for (var entry : allEntries) { + for (var fileInfo : entry.getValue()) { + String fileName = fileInfo.getFileName(); + fileTabletCount.merge(fileName, 1, Integer::sum); + } + } + + Set sharedFiles = fileTabletCount.entrySet().stream().filter(e -> e.getValue() > 1) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + + log.debug("{}: Detected {} shared files out of {} total files", fmtTid, sharedFiles.size(), + fileTabletCount.size()); + + startRow = allEntries.get(0).getKey().prevEndRow(); + loader.start(bulkDir, bulkInfo.tableId, fateId, bulkInfo.setTime); ImportTimingStats importTimingStats = new ImportTimingStats(); @@ -410,12 +438,11 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow); try { PeekingIterator pi = new PeekingIterator<>(tabletsMetadata.iterator()); - while (lmi.hasNext()) { - loadMapEntry = lmi.next(); + for (var entry : allEntries) { // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only look // at the next skipDistance tablets before recreating the iterator if (skipDistance > 0) { - final KeyExtent loadMapKey = loadMapEntry.getKey(); + final KeyExtent loadMapKey = entry.getKey(); if (!pi.findWithin( tm -> PREV_COMP.compare(tm.getPrevEndRow(), loadMapKey.prevEndRow()) >= 0, skipDistance)) { @@ -428,8 +455,8 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, } } List tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, importTimingStats); - loader.load(tablets, loadMapEntry.getValue()); + findOverlappingTablets(fmtTid, entry.getKey(), pi, importTimingStats); + loader.load(tablets, entry.getValue(), sharedFiles); } } finally { tabletsMetadata.close(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index f3147165b16..0497cf3da02 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -132,6 +132,7 @@ static Map> getNewTabletFiles(Fate Function fileInfoProvider) { Map> tabletsFiles = new TreeMap<>(); + Map fileTabletCount = new HashMap<>(); newTablets.keySet().forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); @@ -175,6 +176,8 @@ static Map> getNewTabletFiles(Fate double numOverlapping = newTablets.keySet().stream().map(KeyExtent::toDataRange).filter(overlapPredicate).count(); + fileTabletCount.put(file, (int) numOverlapping); + if (numOverlapping == 0) { log.debug("{} File {} with range {} that does not overlap tablet {}", fateId, file, fileRange, tabletMetadata.getExtent()); @@ -186,8 +189,9 @@ static Map> getNewTabletFiles(Fate // add the file to the tablets it overlaps newTablets.keySet().forEach(newTablet -> { if (overlapPredicate.apply(newTablet.toDataRange())) { + boolean isShared = numOverlapping > 1; DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet, - dataFileValue.getTime()); + dataFileValue.getTime(), isShared); tabletsFiles.get(newTablet).put(file, ndfv); } }); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 3782572bacf..71fdc48ac4b 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo; @@ -95,7 +96,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws } @Override - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { results.add(new LoadResult(tablets, files)); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 0e242f75e04..c707b6a7293 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -135,14 +135,14 @@ public void testFileParitioning() { var firstAndLastKeys = Map.of(file2, newFileInfo("m", "r"), file3, newFileInfo("g", "x"), file4, newFileInfo("s", "v")); - var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20), file3, - new DataFileValue(1000, 100), file4, new DataFileValue(4000, 400)); - var ke4Expected = - Map.of(file1, new DataFileValue(250, 25, 20), file3, new DataFileValue(1000, 100)); + var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true), file4, new DataFileValue(4000, 400, false)); + var ke4Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true)); var expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -165,9 +165,9 @@ public void testFileParitioning() { // Test a tablet with no files going to it var tabletFiles2 = Map.of(file2, tabletFiles.get(file2), file4, tabletFiles.get(file4)); - ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke3Expected = Map.of(file4, new DataFileValue(4000, 400)); + ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke3Expected = Map.of(file4, new DataFileValue(4000, 400, false)); ke4Expected = Map.of(); expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -209,10 +209,10 @@ public void testManyColumns() throws Exception { var flid2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); var loaded = Map.of(loaded1, flid1, loaded2, flid2); - var dfv1 = new DataFileValue(1000, 100, 20); - var dfv2 = new DataFileValue(500, 50, 20); - var dfv3 = new DataFileValue(4000, 400); - var dfv4 = new DataFileValue(2000, 200); + var dfv1 = new DataFileValue(1000, 100, 20, true); + var dfv2 = new DataFileValue(500, 50, 20, false); + var dfv3 = new DataFileValue(4000, 400, false); + var dfv4 = new DataFileValue(2000, 200, true); var tabletFiles = Map.of(file1, dfv1, file2, dfv2, file3, dfv3, file4, dfv4); @@ -307,7 +307,7 @@ public void testManyColumns() throws Exception { .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet1Mutator); - EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putFile(file2, dfv2)).andReturn(tablet1Mutator); // SplitInfo marked as system generated so should be set to ALWAYS (0 delay) @@ -340,10 +340,10 @@ public void testManyColumns() throws Exception { .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putFile(file3, dfv3)).andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet2Mutator); tablet2Mutator.submit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); @@ -356,9 +356,9 @@ public void testManyColumns() throws Exception { EasyMock.expect(tablet3Mutator.requireAbsentLogs()).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.putPrevEndRow(newExtent3.prevEndRow())) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file2)).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file3)).andReturn(tablet3Mutator); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 1c96bddbe70..c2bfdb473b3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -407,7 +407,7 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil Span span2 = TraceUtil.startSpan(this.getClass(), "minorCompact::bringOnline"); try (Scope scope = span2.makeCurrent()) { bringMinorCompactionOnline(tmpDatafile, newDatafile, - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false), commitSession, flushId, mincReason); } catch (Exception e) { final ServiceLock tserverLock = tabletServer.getLock(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java new file mode 100644 index 00000000000..09c465eced0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionFileDeleteIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(CompactionFileDeleteIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + /** + * Test that files created by minor compaction are marked as not shared + */ + @Test + public void testMinorCompactionCreatesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(tableName, null, null, true); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Map filesSharedStatus = getFilesSharedStatus(client, tableId); + + assertFalse(filesSharedStatus.isEmpty(), "Expected at least one file after flush"); + filesSharedStatus.forEach((file, isShared) -> { + assertFalse(isShared, "File created by minor compaction should not be marked as shared: " + + file.getFileName()); + log.info("Verified file {} is not shared", file.getFileName()); + }); + } + } + + /** + * Test that files are marked as shared during table clone + */ + @Test + public void testCloneMarksFilesAsShared() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String[] names = getUniqueNames(2); + String sourceTable = names[0]; + String cloneTable = names[1]; + + client.tableOperations().create(sourceTable); + TableId sourceTableId = TableId.of(client.tableOperations().tableIdMap().get(sourceTable)); + + try (BatchWriter bw = client.createBatchWriter(sourceTable)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(sourceTable, null, null, true); + + Map sourceFilesBefore = getFilesSharedStatus(client, sourceTableId); + assertFalse(sourceFilesBefore.isEmpty(), "Expected files in source table"); + sourceFilesBefore + .forEach((file, isShared) -> assertFalse(isShared, "Source files should not be shared")); + + client.tableOperations().clone(sourceTable, cloneTable, true, null, null); + TableId cloneTableId = TableId.of(client.tableOperations().tableIdMap().get(cloneTable)); + + Map sourceFilesAfter = getFilesSharedStatus(client, sourceTableId); + sourceFilesAfter.forEach((file, isShared) -> { + assertTrue(isShared, + "Source file " + file.getFileName() + " should be marked as shared after clone"); + log.info("Verified source file {} is shared after clone", file.getFileName()); + }); + + Map cloneFiles = getFilesSharedStatus(client, cloneTableId); + cloneFiles.forEach((file, isShared) -> { + assertTrue(isShared, "Clone file " + file.getFileName() + " should be marked as shared"); + log.info("Verified clone file {} is shared", file.getFileName()); + }); + } + } + + /** + * Test that bulk import marks files as shared or not based on how many tablets they go to + */ + @Test + public void testBulkImportMarksFilesCorrectly() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + NewTableConfiguration ntc = new NewTableConfiguration(); + SortedSet splits = new TreeSet<>(); + splits.add(new Text("row0500")); + ntc.withSplits(splits); + client.tableOperations().create(tableName, ntc); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + Map> tabletFiles = getFilesPerTablet(client, tableId); + assertEquals(2, tabletFiles.size(), "Expected 2 tablets"); + + tabletFiles.forEach((tablet, files) -> { + files.forEach((file, isShared) -> { + long tabletCount = tabletFiles.values().stream() + .filter(tabletFileMap -> tabletFileMap.containsKey(file)).count(); + + if (tabletCount == 1) { + assertFalse(isShared, "File in single tablet should not be shared"); + } + }); + }); + } + } + + /** + * Test that compaction processes non-shared files correctly + */ + @Test + public void testCompactionDeletesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + for (int batch = 0; batch < 4; batch++) { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value_" + batch + "_" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + } + + Map before = getFilesSharedStatus(client, tableId); + int beforeCount = before.size(); + log.info("Files before compaction: {}", beforeCount); + assertTrue(beforeCount >= 3, "Expected at least 3 files"); + before.forEach((f, s) -> assertFalse(s, "Files should not be shared")); + + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + Map after = getFilesSharedStatus(client, tableId); + int afterCount = after.size(); + log.info("Files after compaction: {}", afterCount); + + assertTrue(afterCount <= beforeCount, "Expected same or fewer files after compaction"); + after.forEach((f, s) -> assertFalse(s, "Compaction output should not be shared")); + + } + } + + /** + * Get all files and their shared status for a table + */ + private Map getFilesSharedStatus(AccumuloClient client, + TableId tableId) { + Map result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + result.put(file, dfv.isShared()); + + log.trace("File: {}, Size: {}, Entries: {}, Shared: {}", file.getFileName(), + dfv.getSize(), dfv.getNumEntries(), dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry", e); + } + } + } catch (Exception e) { + log.error("Error reading files for table {}", tableId, e); + } + + return result; + } + + /** + * Get files per tablet with their shared status + */ + private Map> getFilesPerTablet(AccumuloClient client, + TableId tableId) { + Map> result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME); + + String currentTablet = null; + Map currentFiles = new HashMap<>(); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cf = entry.getKey().getColumnFamily().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (currentTablet == null || !currentTablet.equals(row)) { + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + currentTablet = row; + currentFiles = new HashMap<>(); + } + + if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.STR_NAME)) { + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + currentFiles.put(file, dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry for tablet {}", row, e); + } + } + } + + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + } catch (Exception e) { + log.error("Error reading files per tablet for table {}", tableId, e); + } + + return result; + } + + /** + * Get GC file candidates for a table + */ + private Set getGcFileCandidates(AccumuloClient client, TableId tableId) { + Set candidates = new HashSet<>(); + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + + scanner.forEach(entry -> { + try { + String row = entry.getKey().getRow().toString(); + String decodedPath = MetadataSchema.DeletesSection.decodeRow(row); + + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (cq.equals(tableId.canonical()) || value.equals(tableId.canonical())) { + candidates.add(decodedPath); + log.debug("Found GC delete marker for table {}: {}", tableId, decodedPath); + } + } catch (Exception e) { + log.warn("Error parsing GC delete marker entry", e); + } + }); + } catch (Exception e) { + log.warn("Error reading GC candidates", e); + } + return candidates; + } +}