Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,54 @@ public class DataFileValue {
private final long size;
private final long numEntries;
private long time = -1;
private boolean shared;

public DataFileValue(long size, long numEntries, long time, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, long time) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = false;
}

public DataFileValue(long size, long numEntries) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = false;
}

public DataFileValue(String encodedDFV) {
String[] ba = encodedDFV.split(",");

size = Long.parseLong(ba[0]);
numEntries = Long.parseLong(ba[1]);
time = -1;
shared = false;

if (ba.length == 3) {
time = Long.parseLong(ba[2]);
} else {
time = -1;
// Could be old format with time, or new format with shared
try {
time = Long.parseLong(ba[2]);
} catch (NumberFormatException e) {
shared = Boolean.parseBoolean(ba[2]);
}
} else if (ba.length == 4) {
shared = Boolean.parseBoolean(ba[2]);
time = Long.parseLong(ba[3]);
}
}

Expand All @@ -74,15 +99,19 @@ public long getTime() {
return time;
}

public boolean isShared() {
return shared;
}

public byte[] encode() {
return encodeAsString().getBytes(UTF_8);
}

public String encodeAsString() {
if (time >= 0) {
return ("" + size + "," + numEntries + "," + time);
return ("" + size + "," + numEntries + "," + shared + "," + time);
}
return ("" + size + "," + numEntries);
return ("" + size + "," + numEntries + "," + shared);
}

public Value encodeAsValue() {
Expand All @@ -93,20 +122,21 @@ public Value encodeAsValue() {
public boolean equals(Object o) {
if (o instanceof DataFileValue odfv) {

return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time;
return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time
&& shared == odfv.shared;
}

return false;
}

@Override
public int hashCode() {
return Long.valueOf(size + numEntries).hashCode();
return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode();
}

@Override
public String toString() {
return size + " " + numEntries;
return size + " " + numEntries + " " + shared;
}

public void setTime(long time) {
Expand All @@ -116,6 +146,10 @@ public void setTime(long time) {
this.time = time;
}

public void setShared(boolean shared) {
this.shared = shared;
}

/**
* @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon
return new Pair<>(result, sizes);
}

private static void markSourceFilesAsShared(TableId srcTableId, TableId tableId,
AccumuloClient client, BatchWriter bw) throws MutationsRejectedException {

log.info("Marking source table {} files as shared after clone to {}", srcTableId, tableId);

try (TabletsMetadata srcTM = createCloneScanner(null, srcTableId, client)) {
for (TabletMetadata srcTablet : srcTM) {

if (srcTablet.getFiles().isEmpty()) {
continue;
}

Mutation m = new Mutation(srcTablet.getExtent().toMetaRow());
boolean hasChanges = false;

// Update each file's DataFileValue to mark as shared
for (var entry : srcTablet.getFilesMap().entrySet()) {
StoredTabletFile file = entry.getKey();
DataFileValue dfv = entry.getValue();

if (!dfv.isShared()) {
DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(),
dfv.isTimeSet() ? dfv.getTime() : -1, true);

m.put(DataFileColumnFamily.NAME, file.getMetadataText(), sharedDfv.encodeAsValue());
hasChanges = true;

log.debug("Marking file {} as shared in source tablet {}", file.getFileName(),
srcTablet.getExtent());
}
}

if (hasChanges) {
bw.addMutation(m);
}
}
}

bw.flush();
log.info("Finished marking source table {} files as shared", srcTableId);
}

private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
Iterable<Entry<Key,Value>> tablet) {

Expand All @@ -191,7 +233,13 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
if (!cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
}
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());

DataFileValue ogVal = new DataFileValue(entry.getValue().get());
DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(),
ogVal.isTimeSet() ? ogVal.getTime() : -1, true);

// FIXED: Use newSharedVal instead of original value
m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue());
} else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) {
Expand Down Expand Up @@ -380,6 +428,8 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId
}
}

markSourceFilesAsShared(srcTableId, tableId, context, bw);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking the source tablets as shared here has race conditions, these may not be the files in the clone. Also marking them as shared must be done using a conditional writer that only changes the marking if the file exists in the tablet w/ the same value (this check must be done by the conditional mutation).

The following conditions must be true for clone to avoid race conditions.

  1. Files are marked as shared before cloning.
  2. After copying the tablets file they must still exist in the source tablet.

Need to rework the code to do the following algorithm for each tablet.

  1. Before cloning mark all files in the source tablet as shared using a conditional mutation.
  2. Copy the source tablets files to the destination tablet.
  3. Read the source and desitation tablets and ensure the destitnations tablet has a subset of the sources files. If so this tablet was successfully cloned and we know the GC did not delete any files because the source still has them. Done with tablet and do not need to next step.
  4. There was a concurrent change w/ the source tablets files, possible that GC even deleted some of them. Delete the destination tablet and go back to step 1.

The current code does this w/o step 1. Also the current code batches tablet read/writes for efficiency. The code would probably be much easier to understand w/o the batching, but doing one tablet at a time w/o batching would be really slow. So need to modify the code work in step 1 and still do the batching.


// delete the clone markers and create directory entries
Scanner mscanner =
context.createScanner(SystemTables.METADATA.tableName(), Authorizations.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +69,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) {
this.newDatafile = newDatafile;
}

private record CompactionFileResult(TabletMetadata tabletMetadata,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var ecid = ExternalCompactionId.of(commitData.ecid);
Expand All @@ -79,25 +85,25 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
// process died and now its running again. In this case commit should do nothing, but its
// important to still carry on with the rest of the steps after commit. This code ignores a that
// fact that a commit may not have happened in the current call and continues for this reason.
TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile);
CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile);

String loc = null;
if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
loc = tabletMetadata.getLocation().getHostPortSession();
if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) {
loc = fileResult.tabletMetadata.getLocation().getHostPortSession();
}

// This will causes the tablet to be reexamined to see if it needs any more compactions.
var extent = KeyExtent.fromThrift(commitData.textent);
env.getEventPublisher().event(extent, "Compaction completed %s", extent);

return new PutGcCandidates(commitData, loc);
return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc());
}

KeyExtent getExtent() {
return KeyExtent.fromThrift(commitData.textent);
}

private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
Optional<ReferencedTabletFile> newDatafile) {

var tablet =
Expand All @@ -107,6 +113,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);

Expand All @@ -126,7 +134,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}

// make the needed updates to the tablet
updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator);
filesToDeleteViaGc = updateTabletForCompaction(ctx, commitData.stats, ecid, tablet,
newDatafile, ecm, tabletMutator);

tabletMutator.submit(
tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid),
Expand Down Expand Up @@ -156,13 +165,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}
}

return tablet;
return new CompactionFileResult(tablet, filesToDeleteViaGc);
}

private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid,
TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm,
private ArrayList<StoredTabletFile> updateTabletForCompaction(ServerContext ctx,
TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet,
Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm,
Ample.ConditionalTabletMutator tabletMutator) {

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

if (ecm.getKind() == CompactionKind.USER) {
if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
// all files selected for the user compactions are finished, so the tablet is finish and
Expand Down Expand Up @@ -211,13 +223,51 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio
// scan
ecm.getJobFiles().forEach(tabletMutator::putScan);
}
ecm.getJobFiles().forEach(tabletMutator::deleteFile);
for (StoredTabletFile file : ecm.getJobFiles()) {
// Check if file is shared
DataFileValue fileMetadata = tablet.getFilesMap().get(file);
boolean isShared = fileMetadata != null && fileMetadata.isShared();

if (isShared) {
// File is shared - must use GC delete markers
LOG.debug("File {} is shared, will create GC delete marker", file.getFileName());
filesToDeleteViaGc.add(file);
} else {
// File is not shared - try to delete directly
try {
Path filePath = new Path(file.getMetadataPath());
boolean deleted = ctx.getVolumeManager().deleteRecursively(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting files here is not correct because the conditional mutation to the metadata table has not yet been made. Only after successfully making the conditional mutation will we know what was shared or not. Need to do something like the following.

  1. Modify conditional mutation to check that the file values (this checks the new shared field) are the same for the tablet. This atomically ensures that what is thought to be shared is actually shared when the metadata table update happens. Currently the conditional mutation only checks the set of files, not their values.
  2. After successfully submitting the condtional mutation look at the table metadata and the list of files compacting (from commitData) . Compute a set of files that is in commitData.inputPaths and is marked as not shared in the tablet metadata. If a file is not in tablet metadata, add it to the set because its shared status is unknown (this can happen because of failure and retry). This is the set of compacting files that are known for sure to not be shared, in the case of failure and retry this set should be empty.
  3. If the conditional mutation was not successful, this set above should be empty.
  4. Pass the set computed above to PutGcCandidates
  5. In PutGcCandidates delete non shared files and add gc delete markers for shared files. Doing the actual file deletes in PutGcCandidates makes reasoning about retries (caused by process dying in middle of fate step) much easier.

There is existing code for comparing the files values, but its not exactly what we need. Would probably need to create a new requireFiles(Map<StoredTabletFile, DataFileValue> files) method that uses code similar to this in its impl.


if (deleted) {
LOG.debug("Successfully deleted non-shared compaction input file: {}",
file.getFileName());
} else {
LOG.warn("Failed to delete non-shared file {}, will create GC delete marker",
file.getFileName());
filesToDeleteViaGc.add(file);
}
} catch (IOException e) {
LOG.warn("Error deleting non-shared file {}, will create GC delete marker: {}",
file.getFileName(), e.getMessage());
filesToDeleteViaGc.add(file);
}
}

// Always delete from tablet metadata
tabletMutator.deleteFile(file);
}

tabletMutator.deleteExternalCompaction(ecid);

if (newDatafile.isPresent()) {
tabletMutator.putFile(newDatafile.orElseThrow(),
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
// NEW: Mark new compaction output files as not shared
DataFileValue newFileValue = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(),
System.currentTimeMillis(), false // New file created by this tablet alone - not shared
);
tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue);
}

return filesToDeleteViaGc;
}

public static boolean canCommitCompaction(ExternalCompactionId ecid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,52 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;

import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PutGcCandidates extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
private final CompactionCommitData commitData;
private final String refreshLocation;
private final ArrayList<String> filesToDeleteViaGc;
private static final Logger LOG = LoggerFactory.getLogger(PutGcCandidates.class);

public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) {
public PutGcCandidates(CompactionCommitData commitData, String refreshLocation,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
this.commitData = commitData;
this.refreshLocation = refreshLocation;
this.filesToDeleteViaGc = new ArrayList<>();
for (StoredTabletFile file : filesToDeleteViaGc) {
this.filesToDeleteViaGc.add(file.getMetadataPath());
}
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
if (filesToDeleteViaGc != null && !filesToDeleteViaGc.isEmpty()) {
var extent = KeyExtent.fromThrift(commitData.textent);
List<ReferenceFile> deleteMarkers = new ArrayList<>();

// add the GC candidates
env.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles());
for (String filePath : filesToDeleteViaGc) {
StoredTabletFile file = new StoredTabletFile(filePath);
deleteMarkers.add(ReferenceFile.forFile(extent.tableId(), file));
LOG.debug("Creating GC delete marker for file: {}", file.getFileName());
}

env.getContext().getAmple().putGcFileAndDirCandidates(extent.tableId(), deleteMarkers);
LOG.debug("Created {} GC delete markers for compaction", deleteMarkers.size());
}

if (refreshLocation == null) {
env.recordCompactionCompletion(ExternalCompactionId.of(commitData.ecid));
Expand Down
Loading