From e6a82f818f178a59c89aa685759a58b34c3aeae0 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 15 Jan 2026 13:09:16 -0500 Subject: [PATCH 1/3] Compaction deletes non-shared files --- .../core/metadata/schema/DataFileValue.java | 51 +- .../server/util/MetadataTableUtil.java | 52 +- .../coordinator/commit/CommitCompaction.java | 74 ++- .../coordinator/commit/PutGcCandidates.java | 31 +- .../manager/tableOps/bulkVer2/LoadFiles.java | 32 +- .../manager/tableOps/split/UpdateTablets.java | 8 +- .../tableOps/bulkVer2/LoadFilesTest.java | 3 +- .../accumulo/tserver/tablet/Tablet.java | 2 +- .../functional/CompactionFileDeleteIT.java | 459 ++++++++++++++++++ 9 files changed, 680 insertions(+), 32 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java index 1402cb83118..d2d265e2e18 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java @@ -28,17 +28,34 @@ public class DataFileValue { private final long size; private final long numEntries; private long time = -1; + private boolean shared; + + public DataFileValue(long size, long numEntries, long time, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = time; + this.shared = shared; + } + + public DataFileValue(long size, long numEntries, boolean shared) { + this.size = size; + this.numEntries = numEntries; + this.time = -1; + this.shared = shared; + } public DataFileValue(long size, long numEntries, long time) { this.size = size; this.numEntries = numEntries; this.time = time; + this.shared = false; } public DataFileValue(long size, long numEntries) { this.size = size; this.numEntries = numEntries; this.time = -1; + this.shared = false; } public DataFileValue(String encodedDFV) { @@ -46,11 +63,20 @@ public DataFileValue(String encodedDFV) { size = Long.parseLong(ba[0]); numEntries = Long.parseLong(ba[1]); - - if (ba.length == 3) { - time = Long.parseLong(ba[2]); - } else { - time = -1; + time = -1; + shared = false; + + if (ba.length >= 3) { + // Field 3 could be either time (old format) or shared (new format) + // Try to parse as time first (for backward compatibility) + try { + time = Long.parseLong(ba[2]); + } catch (NumberFormatException e) { + shared = Boolean.parseBoolean(ba[2]); + } + } + if (ba.length == 4) { + time = Long.parseLong(ba[3]); } } @@ -74,15 +100,19 @@ public long getTime() { return time; } + public boolean isShared() { + return shared; + } + public byte[] encode() { return encodeAsString().getBytes(UTF_8); } public String encodeAsString() { if (time >= 0) { - return ("" + size + "," + numEntries + "," + time); + return ("" + size + "," + numEntries + "," + shared + "," + time); } - return ("" + size + "," + numEntries); + return ("" + size + "," + numEntries + "," + shared); } public Value encodeAsValue() { @@ -93,7 +123,8 @@ public Value encodeAsValue() { public boolean equals(Object o) { if (o instanceof DataFileValue odfv) { - return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time; + return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time + && shared == odfv.shared; } return false; @@ -116,6 +147,10 @@ public void setTime(long time) { this.time = time; } + public void setShared(boolean shared) { + this.shared = shared; + } + /** * @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise. */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index d702b3a4f83..2d59bd0c9b1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -179,6 +179,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon return new Pair<>(result, sizes); } + private static void markSourceFilesAsShared(TableId srcTableId, TableId tableId, + AccumuloClient client, BatchWriter bw) throws MutationsRejectedException { + + log.info("Marking source table {} files as shared after clone to {}", srcTableId, tableId); + + try (TabletsMetadata srcTM = createCloneScanner(null, srcTableId, client)) { + for (TabletMetadata srcTablet : srcTM) { + + if (srcTablet.getFiles().isEmpty()) { + continue; + } + + Mutation m = new Mutation(srcTablet.getExtent().toMetaRow()); + boolean hasChanges = false; + + // Update each file's DataFileValue to mark as shared + for (var entry : srcTablet.getFilesMap().entrySet()) { + StoredTabletFile file = entry.getKey(); + DataFileValue dfv = entry.getValue(); + + if (!dfv.isShared()) { + DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(), + dfv.isTimeSet() ? dfv.getTime() : -1, true); + + m.put(DataFileColumnFamily.NAME, file.getMetadataText(), sharedDfv.encodeAsValue()); + hasChanges = true; + + log.debug("Marking file {} as shared in source tablet {}", file.getFileName(), + srcTablet.getExtent()); + } + } + + if (hasChanges) { + bw.addMutation(m); + } + } + } + + bw.flush(); + log.info("Finished marking source table {} files as shared", srcTableId); + } + private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, Iterable> tablet) { @@ -191,7 +233,13 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, if (!cf.startsWith("../") && !cf.contains(":")) { cf = "../" + srcTableId + entry.getKey().getColumnQualifier(); } - m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue()); + + DataFileValue ogVal = new DataFileValue(entry.getValue().get()); + DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(), + ogVal.isTimeSet() ? ogVal.getTime() : -1, true); + + // FIXED: Use newSharedVal instead of original value + m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue()); } else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue()); } else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) { @@ -380,6 +428,8 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId } } + markSourceFilesAsShared(srcTableId, tableId, context, bw); + // delete the clone markers and create directory entries Scanner mscanner = context.createScanner(SystemTables.METADATA.tableName(), Authorizations.EMPTY); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index bf54f9b836f..4c07be71eb8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -25,7 +25,9 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -67,6 +69,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) { this.newDatafile = newDatafile; } + private record CompactionFileResult(TabletMetadata tabletMetadata, + ArrayList filesToDeleteViaGc) { + } + @Override public Repo call(FateId fateId, FateEnv env) throws Exception { var ecid = ExternalCompactionId.of(commitData.ecid); @@ -79,25 +85,25 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { // process died and now its running again. In this case commit should do nothing, but its // important to still carry on with the rest of the steps after commit. This code ignores a that // fact that a commit may not have happened in the current call and continues for this reason. - TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile); + CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile); String loc = null; - if (tabletMetadata != null && tabletMetadata.getLocation() != null) { - loc = tabletMetadata.getLocation().getHostPortSession(); + if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) { + loc = fileResult.tabletMetadata.getLocation().getHostPortSession(); } // This will causes the tablet to be reexamined to see if it needs any more compactions. var extent = KeyExtent.fromThrift(commitData.textent); env.getEventPublisher().event(extent, "Compaction completed %s", extent); - return new PutGcCandidates(commitData, loc); + return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc()); } KeyExtent getExtent() { return KeyExtent.fromThrift(commitData.textent); } - private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, + private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid, Optional newDatafile) { var tablet = @@ -107,6 +113,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) .logInterval(Duration.ofMinutes(3)).createRetry(); + ArrayList filesToDeleteViaGc = new ArrayList<>(); + while (canCommitCompaction(ecid, tablet)) { CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); @@ -126,7 +134,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId } // make the needed updates to the tablet - updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); + filesToDeleteViaGc = updateTabletForCompaction(ctx, commitData.stats, ecid, tablet, + newDatafile, ecm, tabletMutator); tabletMutator.submit( tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid), @@ -156,13 +165,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId } } - return tablet; + return new CompactionFileResult(tablet, filesToDeleteViaGc); } - private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, - TabletMetadata tablet, Optional newDatafile, CompactionMetadata ecm, + private ArrayList updateTabletForCompaction(ServerContext ctx, + TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet, + Optional newDatafile, CompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) { + ArrayList filesToDeleteViaGc = new ArrayList<>(); + if (ecm.getKind() == CompactionKind.USER) { if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { // all files selected for the user compactions are finished, so the tablet is finish and @@ -211,13 +223,51 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio // scan ecm.getJobFiles().forEach(tabletMutator::putScan); } - ecm.getJobFiles().forEach(tabletMutator::deleteFile); + for (StoredTabletFile file : ecm.getJobFiles()) { + // Check if file is shared + DataFileValue fileMetadata = tablet.getFilesMap().get(file); + boolean isShared = fileMetadata != null && fileMetadata.isShared(); + + if (isShared) { + // File is shared - must use GC delete markers + LOG.debug("File {} is shared, will create GC delete marker", file.getFileName()); + filesToDeleteViaGc.add(file); + } else { + // File is not shared - try to delete directly + try { + Path filePath = new Path(file.getMetadataPath()); + boolean deleted = ctx.getVolumeManager().deleteRecursively(filePath); + + if (deleted) { + LOG.debug("Successfully deleted non-shared compaction input file: {}", + file.getFileName()); + } else { + LOG.warn("Failed to delete non-shared file {}, will create GC delete marker", + file.getFileName()); + filesToDeleteViaGc.add(file); + } + } catch (IOException e) { + LOG.warn("Error deleting non-shared file {}, will create GC delete marker: {}", + file.getFileName(), e.getMessage()); + filesToDeleteViaGc.add(file); + } + } + + // Always delete from tablet metadata + tabletMutator.deleteFile(file); + } + tabletMutator.deleteExternalCompaction(ecid); if (newDatafile.isPresent()) { - tabletMutator.putFile(newDatafile.orElseThrow(), - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + // NEW: Mark new compaction output files as not shared + DataFileValue newFileValue = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), + System.currentTimeMillis(), false // New file created by this tablet alone - not shared + ); + tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue); } + + return filesToDeleteViaGc; } public static boolean canCommitCompaction(ExternalCompactionId ecid, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java index 3933a625048..b244356046b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java @@ -18,27 +18,52 @@ */ package org.apache.accumulo.manager.compaction.coordinator.commit; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.gc.ReferenceFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.manager.tableOps.AbstractFateOperation; import org.apache.accumulo.manager.tableOps.FateEnv; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PutGcCandidates extends AbstractFateOperation { private static final long serialVersionUID = 1L; private final CompactionCommitData commitData; private final String refreshLocation; + private final ArrayList filesToDeleteViaGc; + private static final Logger LOG = LoggerFactory.getLogger(PutGcCandidates.class); - public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) { + public PutGcCandidates(CompactionCommitData commitData, String refreshLocation, + ArrayList filesToDeleteViaGc) { this.commitData = commitData; this.refreshLocation = refreshLocation; + this.filesToDeleteViaGc = new ArrayList<>(); + for (StoredTabletFile file : filesToDeleteViaGc) { + this.filesToDeleteViaGc.add(file.getMetadataPath()); + } } @Override public Repo call(FateId fateId, FateEnv env) throws Exception { + if (filesToDeleteViaGc != null && !filesToDeleteViaGc.isEmpty()) { + var extent = KeyExtent.fromThrift(commitData.textent); + List deleteMarkers = new ArrayList<>(); - // add the GC candidates - env.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles()); + for (String filePath : filesToDeleteViaGc) { + StoredTabletFile file = new StoredTabletFile(filePath); + deleteMarkers.add(ReferenceFile.forFile(extent.tableId(), file)); + LOG.debug("Creating GC delete marker for file: {}", file.getFileName()); + } + + env.getContext().getAmple().putGcFileAndDirCandidates(extent.tableId(), deleteMarkers); + LOG.debug("Created {} GC delete markers for compaction", deleteMarkers.size()); + } if (refreshLocation == null) { env.recordCompactionCompletion(ExternalCompactionId.of(commitData.ecid)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 88238036e38..06e3e56358e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -167,7 +167,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws this.loadingFiles = new HashMap<>(); } - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { Map toLoad = new HashMap<>(); for (var fileInfo : files) { @@ -245,16 +245,18 @@ void load(List tablets, Files files) { ReferencedTabletFile refTabFile = entry.getKey(); Bulk.FileInfo fileInfo = entry.getValue(); + boolean isShared = sharedFiles.contains(fileInfo.getFileName()); DataFileValue dfv; if (setTime) { // This should always be set outside the loop when setTime is true and should not be // null at this point Preconditions.checkState(fileTime != null); - dfv = - new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), fileTime); + dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), + fileTime, isShared); } else { - dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()); + dfv = + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), isShared); } filesToLoad.put(refTabFile, dfv); @@ -402,6 +404,26 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, String fmtTid = fateId.getTxUUIDStr(); log.trace("{}: Started loading files at row: {}", fmtTid, startRow); + Map fileTabletCount = new HashMap<>(); + while (lmi.hasNext()) { + loadMapEntry = lmi.next(); + for (var fileInfo : loadMapEntry.getValue()) { + String fileName = fileInfo.getFileName(); + fileTabletCount.merge(fileName, 1, Integer::sum); + } + } + + Set sharedFiles = fileTabletCount.entrySet().stream().filter(e -> e.getValue() > 1) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + + log.debug("{}: Detected {} shared files out of {} total files", fmtTid, sharedFiles.size(), + fileTabletCount.size()); + + // Reset the iterator to start from the beginning + lmi = new PeekingIterator<>(loadMapIter); + loadMapEntry = lmi.peek(); + startRow = loadMapEntry.getKey().prevEndRow(); + loader.start(bulkDir, bulkInfo.tableId, fateId, bulkInfo.setTime); ImportTimingStats importTimingStats = new ImportTimingStats(); @@ -429,7 +451,7 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, } List tablets = findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, importTimingStats); - loader.load(tablets, loadMapEntry.getValue()); + loader.load(tablets, loadMapEntry.getValue(), sharedFiles); } } finally { tabletsMetadata.close(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index f3147165b16..c970f0c3b51 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -132,6 +132,7 @@ static Map> getNewTabletFiles(Fate Function fileInfoProvider) { Map> tabletsFiles = new TreeMap<>(); + Map fileTabletCount = new HashMap<>(); newTablets.keySet().forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); @@ -175,6 +176,8 @@ static Map> getNewTabletFiles(Fate double numOverlapping = newTablets.keySet().stream().map(KeyExtent::toDataRange).filter(overlapPredicate).count(); + fileTabletCount.put(file, (int) numOverlapping); + if (numOverlapping == 0) { log.debug("{} File {} with range {} that does not overlap tablet {}", fateId, file, fileRange, tabletMetadata.getExtent()); @@ -186,8 +189,9 @@ static Map> getNewTabletFiles(Fate // add the file to the tablets it overlaps newTablets.keySet().forEach(newTablet -> { if (overlapPredicate.apply(newTablet.toDataRange())) { + boolean isShared = numOverlapping > 1; DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet, - dataFileValue.getTime()); + dataFileValue.getTime(), isShared); tabletsFiles.get(newTablet).put(file, ndfv); } }); @@ -227,6 +231,8 @@ private void addNewTablets(FateId fateId, FateEnv env, TabletMetadata tabletMeta } var mutator = tabletsMutator.mutateTablet(newExtent).requireAbsentTablet(); + // dfv now includes shared status + newTabletsFiles.get(newExtent).forEach(mutator::putFile); mutator.putOperation(opid); mutator.putDirName(dirNameIter.next()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 3782572bacf..71fdc48ac4b 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo; @@ -95,7 +96,7 @@ void start(Path bulkDir, TableId tableId, FateId fateId, boolean setTime) throws } @Override - void load(List tablets, Files files) { + void load(List tablets, Files files, Set sharedFiles) { results.add(new LoadResult(tablets, files)); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 1c96bddbe70..c2bfdb473b3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -407,7 +407,7 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil Span span2 = TraceUtil.startSpan(this.getClass(), "minorCompact::bringOnline"); try (Scope scope = span2.makeCurrent()) { bringMinorCompactionOnline(tmpDatafile, newDatafile, - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), commitSession, + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false), commitSession, flushId, mincReason); } catch (Exception e) { final ServiceLock tserverLock = tabletServer.getLock(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java new file mode 100644 index 00000000000..dc4bad80286 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionFileDeleteIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(CompactionFileDeleteIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + /** + * Test that files created by minor compaction are marked as not shared + */ + @Test + public void testMinorCompactionCreatesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + + client.tableOperations().flush(tableName, null, null, true); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + Map filesSharedStatus = getFilesSharedStatus(client, tableId); + + assertFalse(filesSharedStatus.isEmpty(), "Expected at least one file after flush"); + filesSharedStatus.forEach((file, isShared) -> { + assertFalse(isShared, "File created by minor compaction should not be marked as shared: " + + file.getFileName()); + log.info("Verified file {} is not shared", file.getFileName()); + }); + } + } + + // /** + // * Test that files are appropriately marked as shared or not shared during tablet split + // */ + // @Test + // public void testSplitMarksFilesCorrectly() throws Exception { + // try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + // String tableName = getUniqueNames(1)[0]; + // client.tableOperations().create(tableName); + // TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + // + // try (BatchWriter bw = client.createBatchWriter(tableName)) { + // for (int i = 0; i < 1000; i++) { + // Mutation m = new Mutation(String.format("row%04d", i)); + // m.put("cf", "cq", "value" + i); + // bw.addMutation(m); + // } + // } + // client.tableOperations().flush(tableName, null, null, true); + // Map filesBeforeSplit = getFilesSharedStatus(client, tableId); + // log.info("Files before split: {}", filesBeforeSplit.size()); + // + // SortedSet splits = new TreeSet<>(); + // splits.add(new Text("row0500")); + // client.tableOperations().addSplits(tableName, splits); + // + // int attempts = 0; + // int tabletCount = 0; + // while (attempts < 30) { // 30 seconds max + // tabletCount = getFilesPerTablet(client, tableId).size(); + // if (tabletCount >= 2) { + // break; + // } + // Thread.sleep(1000); + // attempts++; + // } + // + // log.info("Split completed, found {} tablets after {} seconds", tabletCount, attempts); + // assertTrue(tabletCount >= 2, "Expected at least 2 tablets after split"); + // + // Map> tabletFiles = getFilesPerTablet(client, tableId); + // log.info("Number of tablets after split: {}", tabletFiles.size()); + // + // for (Map.Entry> tabletEntry : tabletFiles.entrySet()) { + // String tablet = tabletEntry.getKey(); + // Map files = tabletEntry.getValue(); + // + // log.info("Tablet: {} has {} files", tablet, files.size()); + // + // for (Map.Entry fileEntry : files.entrySet()) { + // StoredTabletFile file = fileEntry.getKey(); + // Boolean isShared = fileEntry.getValue(); + // long tabletCountWithFile = tabletFiles.values().stream() + // .filter(tabletFileMap -> tabletFileMap.containsKey(file)) + // .count(); + // + // log.info(" File: {}, Shared: {}, TabletCount: {}", + // file.getFileName(), isShared, tabletCountWithFile); + // + // if (tabletCountWithFile > 1) { + // assertTrue(isShared, + // "File " + file.getFileName() + " exists in " + tabletCountWithFile + // + " tablets and should be marked as shared"); + // } else { + // assertFalse(isShared, + // "File " + file.getFileName() + " exists in only 1 tablet and should NOT be shared"); + // } + // } + // } + // } + // } + + /** + * Test that files are marked as shared during table clone + */ + @Test + public void testCloneMarksFilesAsShared() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String[] names = getUniqueNames(2); + String sourceTable = names[0]; + String cloneTable = names[1]; + + client.tableOperations().create(sourceTable); + TableId sourceTableId = TableId.of(client.tableOperations().tableIdMap().get(sourceTable)); + + try (BatchWriter bw = client.createBatchWriter(sourceTable)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(sourceTable, null, null, true); + + Map sourceFilesBefore = getFilesSharedStatus(client, sourceTableId); + assertFalse(sourceFilesBefore.isEmpty(), "Expected files in source table"); + sourceFilesBefore + .forEach((file, isShared) -> assertFalse(isShared, "Source files should not be shared")); + + client.tableOperations().clone(sourceTable, cloneTable, true, null, null); + TableId cloneTableId = TableId.of(client.tableOperations().tableIdMap().get(cloneTable)); + + Map sourceFilesAfter = getFilesSharedStatus(client, sourceTableId); + sourceFilesAfter.forEach((file, isShared) -> { + assertTrue(isShared, + "Source file " + file.getFileName() + " should be marked as shared after clone"); + log.info("Verified source file {} is shared after clone", file.getFileName()); + }); + + Map cloneFiles = getFilesSharedStatus(client, cloneTableId); + cloneFiles.forEach((file, isShared) -> { + assertTrue(isShared, "Clone file " + file.getFileName() + " should be marked as shared"); + log.info("Verified clone file {} is shared", file.getFileName()); + }); + } + } + + /** + * Test that bulk import marks files as shared or not based on how many tablets they go to + */ + @Test + public void testBulkImportMarksFilesCorrectly() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + NewTableConfiguration ntc = new NewTableConfiguration(); + SortedSet splits = new TreeSet<>(); + splits.add(new Text("row0500")); + ntc.withSplits(splits); + client.tableOperations().create(tableName, ntc); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + Map> tabletFiles = getFilesPerTablet(client, tableId); + assertEquals(2, tabletFiles.size(), "Expected 2 tablets"); + + tabletFiles.forEach((tablet, files) -> { + files.forEach((file, isShared) -> { + long tabletCount = tabletFiles.values().stream() + .filter(tabletFileMap -> tabletFileMap.containsKey(file)).count(); + + if (tabletCount == 1) { + assertFalse(isShared, "File in single tablet should not be shared"); + } + }); + }); + } + } + + /** + * Test that compaction processes non-shared files correctly + */ + @Test + public void testCompactionDeletesNonSharedFiles() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + // Create multiple files + for (int batch = 0; batch < 4; batch++) { + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(String.format("row%04d", i)); + m.put("cf", "cq", "value_" + batch + "_" + i); + bw.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + } + + Map before = getFilesSharedStatus(client, tableId); + int beforeCount = before.size(); + log.info("Files before compaction: {}", beforeCount); + assertTrue(beforeCount >= 3, "Expected at least 3 files"); + before.forEach((f, s) -> assertFalse(s, "Files should not be shared")); + + // Compact + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Map after = getFilesSharedStatus(client, tableId); + int afterCount = after.size(); + log.info("Files after compaction: {}", afterCount); + + assertTrue(afterCount <= beforeCount, "Expected same or fewer files after compaction"); + after.forEach((f, s) -> assertFalse(s, "Compaction output should not be shared")); + + log.info("✓ Compaction test passed"); + } + } + + // /** + // * Test that shared files go through GC process when compacted + // */ + // @Test + // public void testCompactionCreatesGcMarkersForSharedFiles() throws Exception { + // try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + // String tableName = getUniqueNames(1)[0]; + // + // Map props = new HashMap<>(); + // props.put(Property.TABLE_MAJC_RATIO.getKey(), "10"); + // NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + // client.tableOperations().create(tableName, ntc); + // + // TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + // + // try (BatchWriter bw = client.createBatchWriter(tableName)) { + // for (int i = 0; i < 500; i++) { + // Mutation m = new Mutation(String.format("row%04d", i)); + // m.put("cf", "cq", "value" + i); + // bw.addMutation(m); + // } + // } + // client.tableOperations().flush(tableName, null, null, true); + // + // SortedSet splits = new TreeSet<>(); + // splits.add(new Text("row0250")); + // client.tableOperations().addSplits(tableName, splits); + // + // Map> tabletFilesBefore = + // getFilesPerTablet(client, tableId); + // + // StoredTabletFile sharedFile = null; + // for (Map files : tabletFilesBefore.values()) { + // for (Map.Entry entry : files.entrySet()) { + // if (entry.getValue()) { // isShared == true + // sharedFile = entry.getKey(); + // break; + // } + // } + // if (sharedFile != null) { + // break; + // } + // } + // + // if (sharedFile != null) { + // log.info("Found shared file: {}", sharedFile.getFileName()); + // client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + // Set gcCandidates = getGcFileCandidates(client, tableId); + // log.info("GC candidates after compaction: {}", gcCandidates); + // } + // } + // } + + /** + * Get all files and their shared status for a table + */ + private Map getFilesSharedStatus(AccumuloClient client, + TableId tableId) { + Map result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + result.put(file, dfv.isShared()); + + log.trace("File: {}, Size: {}, Entries: {}, Shared: {}", file.getFileName(), + dfv.getSize(), dfv.getNumEntries(), dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry", e); + } + } + } catch (Exception e) { + log.error("Error reading files for table {}", tableId, e); + } + + return result; + } + + /** + * Get files per tablet with their shared status + */ + private Map> getFilesPerTablet(AccumuloClient client, + TableId tableId) { + Map> result = new HashMap<>(); + + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME); + + String currentTablet = null; + Map currentFiles = new HashMap<>(); + + for (var entry : scanner) { + String row = entry.getKey().getRow().toString(); + String cf = entry.getKey().getColumnFamily().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (currentTablet == null || !currentTablet.equals(row)) { + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + currentTablet = row; + currentFiles = new HashMap<>(); + } + + if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.STR_NAME)) { + try { + StoredTabletFile file = new StoredTabletFile(cq); + DataFileValue dfv = new DataFileValue(value); + currentFiles.put(file, dfv.isShared()); + } catch (Exception e) { + log.warn("Error parsing file entry for tablet {}", row, e); + } + } + } + + if (currentTablet != null && !currentFiles.isEmpty()) { + result.put(currentTablet, currentFiles); + } + } catch (Exception e) { + log.error("Error reading files per tablet for table {}", tableId, e); + } + + return result; + } + + /** + * Get GC file candidates for a table + */ + private Set getGcFileCandidates(AccumuloClient client, TableId tableId) { + Set candidates = new HashSet<>(); + try (Scanner scanner = + client.createScanner(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY)) { + + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + + scanner.forEach(entry -> { + try { + String row = entry.getKey().getRow().toString(); + String decodedPath = MetadataSchema.DeletesSection.decodeRow(row); + + String cq = entry.getKey().getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (cq.equals(tableId.canonical()) || value.equals(tableId.canonical())) { + candidates.add(decodedPath); + log.debug("Found GC delete marker for table {}: {}", tableId, decodedPath); + } + } catch (Exception e) { + log.warn("Error parsing GC delete marker entry", e); + } + }); + } catch (Exception e) { + log.warn("Error reading GC candidates", e); + } + return candidates; + } +} From f9417216decb83ac6061d1329b4ed96c4e4d1bc6 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 15 Jan 2026 13:55:03 -0500 Subject: [PATCH 2/3] removed unused tests --- .../functional/CompactionFileDeleteIT.java | 128 ------------------ 1 file changed, 128 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java index dc4bad80286..09c465eced0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionFileDeleteIT.java @@ -87,77 +87,6 @@ public void testMinorCompactionCreatesNonSharedFiles() throws Exception { } } - // /** - // * Test that files are appropriately marked as shared or not shared during tablet split - // */ - // @Test - // public void testSplitMarksFilesCorrectly() throws Exception { - // try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - // String tableName = getUniqueNames(1)[0]; - // client.tableOperations().create(tableName); - // TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); - // - // try (BatchWriter bw = client.createBatchWriter(tableName)) { - // for (int i = 0; i < 1000; i++) { - // Mutation m = new Mutation(String.format("row%04d", i)); - // m.put("cf", "cq", "value" + i); - // bw.addMutation(m); - // } - // } - // client.tableOperations().flush(tableName, null, null, true); - // Map filesBeforeSplit = getFilesSharedStatus(client, tableId); - // log.info("Files before split: {}", filesBeforeSplit.size()); - // - // SortedSet splits = new TreeSet<>(); - // splits.add(new Text("row0500")); - // client.tableOperations().addSplits(tableName, splits); - // - // int attempts = 0; - // int tabletCount = 0; - // while (attempts < 30) { // 30 seconds max - // tabletCount = getFilesPerTablet(client, tableId).size(); - // if (tabletCount >= 2) { - // break; - // } - // Thread.sleep(1000); - // attempts++; - // } - // - // log.info("Split completed, found {} tablets after {} seconds", tabletCount, attempts); - // assertTrue(tabletCount >= 2, "Expected at least 2 tablets after split"); - // - // Map> tabletFiles = getFilesPerTablet(client, tableId); - // log.info("Number of tablets after split: {}", tabletFiles.size()); - // - // for (Map.Entry> tabletEntry : tabletFiles.entrySet()) { - // String tablet = tabletEntry.getKey(); - // Map files = tabletEntry.getValue(); - // - // log.info("Tablet: {} has {} files", tablet, files.size()); - // - // for (Map.Entry fileEntry : files.entrySet()) { - // StoredTabletFile file = fileEntry.getKey(); - // Boolean isShared = fileEntry.getValue(); - // long tabletCountWithFile = tabletFiles.values().stream() - // .filter(tabletFileMap -> tabletFileMap.containsKey(file)) - // .count(); - // - // log.info(" File: {}, Shared: {}, TabletCount: {}", - // file.getFileName(), isShared, tabletCountWithFile); - // - // if (tabletCountWithFile > 1) { - // assertTrue(isShared, - // "File " + file.getFileName() + " exists in " + tabletCountWithFile - // + " tablets and should be marked as shared"); - // } else { - // assertFalse(isShared, - // "File " + file.getFileName() + " exists in only 1 tablet and should NOT be shared"); - // } - // } - // } - // } - // } - /** * Test that files are marked as shared during table clone */ @@ -253,7 +182,6 @@ public void testCompactionDeletesNonSharedFiles() throws Exception { client.tableOperations().create(tableName); TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); - // Create multiple files for (int batch = 0; batch < 4; batch++) { try (BatchWriter bw = client.createBatchWriter(tableName)) { for (int i = 0; i < 100; i++) { @@ -271,9 +199,7 @@ public void testCompactionDeletesNonSharedFiles() throws Exception { assertTrue(beforeCount >= 3, "Expected at least 3 files"); before.forEach((f, s) -> assertFalse(s, "Files should not be shared")); - // Compact client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - Map after = getFilesSharedStatus(client, tableId); int afterCount = after.size(); log.info("Files after compaction: {}", afterCount); @@ -281,63 +207,9 @@ public void testCompactionDeletesNonSharedFiles() throws Exception { assertTrue(afterCount <= beforeCount, "Expected same or fewer files after compaction"); after.forEach((f, s) -> assertFalse(s, "Compaction output should not be shared")); - log.info("✓ Compaction test passed"); } } - // /** - // * Test that shared files go through GC process when compacted - // */ - // @Test - // public void testCompactionCreatesGcMarkersForSharedFiles() throws Exception { - // try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - // String tableName = getUniqueNames(1)[0]; - // - // Map props = new HashMap<>(); - // props.put(Property.TABLE_MAJC_RATIO.getKey(), "10"); - // NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); - // client.tableOperations().create(tableName, ntc); - // - // TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); - // - // try (BatchWriter bw = client.createBatchWriter(tableName)) { - // for (int i = 0; i < 500; i++) { - // Mutation m = new Mutation(String.format("row%04d", i)); - // m.put("cf", "cq", "value" + i); - // bw.addMutation(m); - // } - // } - // client.tableOperations().flush(tableName, null, null, true); - // - // SortedSet splits = new TreeSet<>(); - // splits.add(new Text("row0250")); - // client.tableOperations().addSplits(tableName, splits); - // - // Map> tabletFilesBefore = - // getFilesPerTablet(client, tableId); - // - // StoredTabletFile sharedFile = null; - // for (Map files : tabletFilesBefore.values()) { - // for (Map.Entry entry : files.entrySet()) { - // if (entry.getValue()) { // isShared == true - // sharedFile = entry.getKey(); - // break; - // } - // } - // if (sharedFile != null) { - // break; - // } - // } - // - // if (sharedFile != null) { - // log.info("Found shared file: {}", sharedFile.getFileName()); - // client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); - // Set gcCandidates = getGcFileCandidates(client, tableId); - // log.info("GC candidates after compaction: {}", gcCandidates); - // } - // } - // } - /** * Get all files and their shared status for a table */ From a26e1c18333be2c7fdb8ca74b7e61a61d7b4451f Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Mon, 26 Jan 2026 13:21:32 -0500 Subject: [PATCH 3/3] updated tests --- .../core/metadata/schema/DataFileValue.java | 13 +++--- .../manager/tableOps/bulkVer2/LoadFiles.java | 29 ++++++++------ .../manager/tableOps/split/UpdateTablets.java | 2 - .../tableOps/split/UpdateTabletsTest.java | 40 +++++++++---------- 4 files changed, 43 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java index d2d265e2e18..45dc75d4611 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java @@ -66,16 +66,15 @@ public DataFileValue(String encodedDFV) { time = -1; shared = false; - if (ba.length >= 3) { - // Field 3 could be either time (old format) or shared (new format) - // Try to parse as time first (for backward compatibility) + if (ba.length == 3) { + // Could be old format with time, or new format with shared try { time = Long.parseLong(ba[2]); } catch (NumberFormatException e) { shared = Boolean.parseBoolean(ba[2]); } - } - if (ba.length == 4) { + } else if (ba.length == 4) { + shared = Boolean.parseBoolean(ba[2]); time = Long.parseLong(ba[3]); } } @@ -132,12 +131,12 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Long.valueOf(size + numEntries).hashCode(); + return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode(); } @Override public String toString() { - return size + " " + numEntries; + return size + " " + numEntries + " " + shared; } public void setTime(long time) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 06e3e56358e..3f1ebf9aa45 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -404,10 +404,19 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, String fmtTid = fateId.getTxUUIDStr(); log.trace("{}: Started loading files at row: {}", fmtTid, startRow); - Map fileTabletCount = new HashMap<>(); + List> allEntries = new ArrayList<>(); while (lmi.hasNext()) { - loadMapEntry = lmi.next(); - for (var fileInfo : loadMapEntry.getValue()) { + allEntries.add(lmi.next()); + } + + if (allEntries.isEmpty()) { + log.warn("{}: No files to load", fateId.getTxUUIDStr()); + return 0; + } + + Map fileTabletCount = new HashMap<>(); + for (var entry : allEntries) { + for (var fileInfo : entry.getValue()) { String fileName = fileInfo.getFileName(); fileTabletCount.merge(fileName, 1, Integer::sum); } @@ -419,10 +428,7 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, log.debug("{}: Detected {} shared files out of {} total files", fmtTid, sharedFiles.size(), fileTabletCount.size()); - // Reset the iterator to start from the beginning - lmi = new PeekingIterator<>(loadMapIter); - loadMapEntry = lmi.peek(); - startRow = loadMapEntry.getKey().prevEndRow(); + startRow = allEntries.get(0).getKey().prevEndRow(); loader.start(bulkDir, bulkInfo.tableId, fateId, bulkInfo.setTime); @@ -432,12 +438,11 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow); try { PeekingIterator pi = new PeekingIterator<>(tabletsMetadata.iterator()); - while (lmi.hasNext()) { - loadMapEntry = lmi.next(); + for (var entry : allEntries) { // If the user set the TABLE_BULK_SKIP_THRESHOLD property, then only look // at the next skipDistance tablets before recreating the iterator if (skipDistance > 0) { - final KeyExtent loadMapKey = loadMapEntry.getKey(); + final KeyExtent loadMapKey = entry.getKey(); if (!pi.findWithin( tm -> PREV_COMP.compare(tm.getPrevEndRow(), loadMapKey.prevEndRow()) >= 0, skipDistance)) { @@ -450,8 +455,8 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, } } List tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), pi, importTimingStats); - loader.load(tablets, loadMapEntry.getValue(), sharedFiles); + findOverlappingTablets(fmtTid, entry.getKey(), pi, importTimingStats); + loader.load(tablets, entry.getValue(), sharedFiles); } } finally { tabletsMetadata.close(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index c970f0c3b51..0497cf3da02 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -231,8 +231,6 @@ private void addNewTablets(FateId fateId, FateEnv env, TabletMetadata tabletMeta } var mutator = tabletsMutator.mutateTablet(newExtent).requireAbsentTablet(); - // dfv now includes shared status - newTabletsFiles.get(newExtent).forEach(mutator::putFile); mutator.putOperation(opid); mutator.putDirName(dirNameIter.next()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index 0e242f75e04..c707b6a7293 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -135,14 +135,14 @@ public void testFileParitioning() { var firstAndLastKeys = Map.of(file2, newFileInfo("m", "r"), file3, newFileInfo("g", "x"), file4, newFileInfo("s", "v")); - var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20), file2, - new DataFileValue(1000, 100, 50), file3, new DataFileValue(1000, 100)); - var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20), file3, - new DataFileValue(1000, 100), file4, new DataFileValue(4000, 400)); - var ke4Expected = - Map.of(file1, new DataFileValue(250, 25, 20), file3, new DataFileValue(1000, 100)); + var ke1Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke2Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file2, + new DataFileValue(1000, 100, 50, true), file3, new DataFileValue(1000, 100, true)); + var ke3Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true), file4, new DataFileValue(4000, 400, false)); + var ke4Expected = Map.of(file1, new DataFileValue(250, 25, 20, true), file3, + new DataFileValue(1000, 100, true)); var expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -165,9 +165,9 @@ public void testFileParitioning() { // Test a tablet with no files going to it var tabletFiles2 = Map.of(file2, tabletFiles.get(file2), file4, tabletFiles.get(file4)); - ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50)); - ke3Expected = Map.of(file4, new DataFileValue(4000, 400)); + ke1Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke2Expected = Map.of(file2, new DataFileValue(1000, 100, 50, true)); + ke3Expected = Map.of(file4, new DataFileValue(4000, 400, false)); ke4Expected = Map.of(); expected = Map.of(ke1, ke1Expected, ke2, ke2Expected, ke3, ke3Expected, ke4, ke4Expected); @@ -209,10 +209,10 @@ public void testManyColumns() throws Exception { var flid2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); var loaded = Map.of(loaded1, flid1, loaded2, flid2); - var dfv1 = new DataFileValue(1000, 100, 20); - var dfv2 = new DataFileValue(500, 50, 20); - var dfv3 = new DataFileValue(4000, 400); - var dfv4 = new DataFileValue(2000, 200); + var dfv1 = new DataFileValue(1000, 100, 20, true); + var dfv2 = new DataFileValue(500, 50, 20, false); + var dfv3 = new DataFileValue(4000, 400, false); + var dfv4 = new DataFileValue(2000, 200, true); var tabletFiles = Map.of(file1, dfv1, file2, dfv2, file3, dfv3, file4, dfv4); @@ -307,7 +307,7 @@ public void testManyColumns() throws Exception { .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet1Mutator); - EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet1Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet1Mutator); EasyMock.expect(tablet1Mutator.putFile(file2, dfv2)).andReturn(tablet1Mutator); // SplitInfo marked as system generated so should be set to ALWAYS (0 delay) @@ -340,10 +340,10 @@ public void testManyColumns() throws Exception { .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putBulkFile(loaded2.getTabletFile(), flid2)) .andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet2Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet2Mutator); EasyMock.expect(tablet2Mutator.putFile(file3, dfv3)).andReturn(tablet2Mutator); - EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet2Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet2Mutator); tablet2Mutator.submit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); @@ -356,9 +356,9 @@ public void testManyColumns() throws Exception { EasyMock.expect(tablet3Mutator.requireAbsentLogs()).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.putPrevEndRow(newExtent3.prevEndRow())) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20))) + EasyMock.expect(tablet3Mutator.putFile(file1, new DataFileValue(333, 33, 20, true))) .andReturn(tablet3Mutator); - EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100))) + EasyMock.expect(tablet3Mutator.putFile(file4, new DataFileValue(1000, 100, true))) .andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file2)).andReturn(tablet3Mutator); EasyMock.expect(tablet3Mutator.deleteFile(file3)).andReturn(tablet3Mutator);