Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.split.FileRangeCache;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.tableOps.FateEnv;
Expand Down Expand Up @@ -557,12 +558,17 @@ ManagerGoalState getManagerGoalState() {
}

private Splitter splitter;
private FileRangeCache fileRangeCache;

@Override
public Splitter getSplitter() {
return splitter;
}

@Override
public FileRangeCache getSplitFileCache() {
return fileRangeCache;
}

public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
return upgradeCoordinator.getStatus();
}
Expand Down Expand Up @@ -1118,6 +1124,7 @@ boolean canSuspendTablets() {

this.splitter = new Splitter(this);
this.splitter.start();
this.fileRangeCache = new FileRangeCache(context);

try {
Predicate<ZooUtil.LockID> isLockHeld =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.split;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;

/**
* A cache of files first and last rows.
*/
public class FileRangeCache {

private static final Logger LOG = LoggerFactory.getLogger(FileRangeCache.class);

private record CacheKey(TableId tableId, TabletFile tabletFile) {
}

public static <T extends TabletFile> Map<T,FileSKVIterator.FileRange> tryToGetFirstAndLastRows(
ServerContext context, TableConfiguration tableConf, Set<T> dataFiles) {

HashMap<T,FileSKVIterator.FileRange> dataFilesInfo = new HashMap<>();

long t1 = System.currentTimeMillis();

for (T dataFile : dataFiles) {

FileSKVIterator reader = null;
FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath());
try {
reader = FileOperations.getInstance().newReaderBuilder()
.forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService())
.withTableConfiguration(tableConf).build();

dataFilesInfo.put(dataFile, reader.getFileRange());
} catch (IOException ioe) {
LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
LOG.warn("failed to close " + dataFile, ioe);
}
}
}

}

long t2 = System.currentTimeMillis();

String message = String.format("Found first and last keys for %d data files in %6.2f secs",
dataFiles.size(), (t2 - t1) / 1000.0);
if (t2 - t1 > 500) {
LOG.debug(message);
} else {
LOG.trace(message);
}

return dataFilesInfo;
}

final LoadingCache<CacheKey,FileSKVIterator.FileRange> splitFileCache;

public FileRangeCache(ServerContext context) {
Weigher<CacheKey,FileSKVIterator.FileRange> weigher = (key, frange) -> key.tableId.canonical()
.length() + key.tabletFile.getPath().toString().length()
+ (frange.empty ? 0
: frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength());

CacheLoader<CacheKey,FileSKVIterator.FileRange> loader = key -> {
TableConfiguration tableConf = context.getTableConfiguration(key.tableId);
return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile))
.get(key.tabletFile);
};

splitFileCache = context.getCaches().createNewBuilder(Caches.CacheName.SPLITTER_FILES, true)
.expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher)
.build(loader);
}

public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
return splitFileCache.get(new CacheKey(tableId, tabletFile));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,23 @@

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.split.FindSplits;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;

public class Splitter {

private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
Expand Down Expand Up @@ -115,104 +101,13 @@ private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> split
}
}

public static <T extends TabletFile> Map<T,FileSKVIterator.FileRange> tryToGetFirstAndLastRows(
ServerContext context, TableConfiguration tableConf, Set<T> dataFiles) {

HashMap<T,FileSKVIterator.FileRange> dataFilesInfo = new HashMap<>();

long t1 = System.currentTimeMillis();

for (T dataFile : dataFiles) {

FileSKVIterator reader = null;
FileSystem ns = context.getVolumeManager().getFileSystemByPath(dataFile.getPath());
try {
reader = FileOperations.getInstance().newReaderBuilder()
.forFile(dataFile, ns, ns.getConf(), tableConf.getCryptoService())
.withTableConfiguration(tableConf).build();

dataFilesInfo.put(dataFile, reader.getFileRange());
} catch (IOException ioe) {
LOG.warn("Failed to read data file to determine first and last key : " + dataFile, ioe);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
LOG.warn("failed to close " + dataFile, ioe);
}
}
}

}

long t2 = System.currentTimeMillis();

String message = String.format("Found first and last keys for %d data files in %6.2f secs",
dataFiles.size(), (t2 - t1) / 1000.0);
if (t2 - t1 > 500) {
LOG.debug(message);
} else {
LOG.trace(message);
}

return dataFilesInfo;
}

private static class CacheKey {

final TableId tableId;
final TabletFile tabletFile;

public CacheKey(TableId tableId, TabletFile tabletFile) {
this.tableId = tableId;
this.tabletFile = tabletFile;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(tableId, cacheKey.tableId)
&& Objects.equals(tabletFile, cacheKey.tabletFile);
}

@Override
public int hashCode() {
return Objects.hash(tableId, tabletFile);
}

}

final LoadingCache<CacheKey,FileSKVIterator.FileRange> splitFileCache;

public Splitter(Manager manager) {
this.manager = manager;
ServerContext context = manager.getContext();

this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1)
.numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build();

Weigher<CacheKey,FileSKVIterator.FileRange> weigher = (key, frange) -> key.tableId.canonical()
.length() + key.tabletFile.getPath().toString().length()
+ (frange.empty ? 0
: frange.rowRange.getStartKey().getLength() + frange.rowRange.getEndKey().getLength());

CacheLoader<CacheKey,FileSKVIterator.FileRange> loader = key -> {
TableConfiguration tableConf = context.getTableConfiguration(key.tableId);
return tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile))
.get(key.tabletFile);
};

splitFileCache = context.getCaches().createNewBuilder(CacheName.SPLITTER_FILES, true)
.expireAfterAccess(10, TimeUnit.MINUTES).maximumWeight(10_000_000L).weigher(weigher)
.build(loader);

}

public synchronized void start() {
Expand All @@ -223,10 +118,6 @@ public synchronized void stop() {
splitExecutor.shutdownNow();
}

public FileSKVIterator.FileRange getCachedFileInfo(TableId tableId, TabletFile tabletFile) {
return splitFileCache.get(new CacheKey(tableId, tabletFile));
}

public void initiateSplit(KeyExtent extent) {
// Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste
// work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.EventPublisher;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.split.FileRangeCache;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tables.TableManager;
Expand Down Expand Up @@ -55,7 +55,7 @@ public interface FateEnv {

ExecutorService getTabletRefreshThreadPool();

Splitter getSplitter();
FileRangeCache getSplitFileCache();

ExecutorService getRenamePool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var newTablets = splitInfo.getTablets();

var newTabletsFiles = getNewTabletFiles(fateId, newTablets, tabletMetadata,
file -> env.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));
file -> env.getSplitFileCache().getCachedFileInfo(splitInfo.getOriginal().tableId(), file));

addNewTablets(fateId, env, tabletMetadata, opid, newTablets, newTabletsFiles);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.split.FileRangeCache;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -237,12 +237,16 @@ public void testManyColumns() throws Exception {
EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
Ample ample = EasyMock.mock(Ample.class);
EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
Splitter splitter = EasyMock.mock(Splitter.class);
EasyMock.expect(splitter.getCachedFileInfo(tableId, file1)).andReturn(newFileInfo("a", "z"));
EasyMock.expect(splitter.getCachedFileInfo(tableId, file2)).andReturn(newFileInfo("a", "b"));
EasyMock.expect(splitter.getCachedFileInfo(tableId, file3)).andReturn(newFileInfo("d", "f"));
EasyMock.expect(splitter.getCachedFileInfo(tableId, file4)).andReturn(newFileInfo("d", "j"));
EasyMock.expect(manager.getSplitter()).andReturn(splitter).atLeastOnce();
FileRangeCache fileRangeCache = EasyMock.mock(FileRangeCache.class);
EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file1))
.andReturn(newFileInfo("a", "z"));
EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file2))
.andReturn(newFileInfo("a", "b"));
EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file3))
.andReturn(newFileInfo("d", "f"));
EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file4))
.andReturn(newFileInfo("d", "j"));
EasyMock.expect(manager.getSplitFileCache()).andReturn(fileRangeCache).atLeastOnce();
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS))
.atLeastOnce();

Expand Down Expand Up @@ -389,8 +393,8 @@ public void testManyColumns() throws Exception {
tabletsMutator.close();
EasyMock.expectLastCall().anyTimes();

EasyMock.replay(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator,
tablet2Mutator, tablet3Mutator, cr, compactions);
EasyMock.replay(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator,
tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
// Now we can actually test the split code that writes the new tablets with a bunch columns in
// the original tablet
SortedSet<Text> splits = new TreeSet<>(List.of(newExtent1.endRow(), newExtent2.endRow()));
Expand All @@ -399,8 +403,8 @@ public void testManyColumns() throws Exception {
List.of(dir1, dir2));
updateTablets.call(fateId, manager);

EasyMock.verify(manager, context, ample, tabletMeta, splitter, tabletsMutator, tablet1Mutator,
tablet2Mutator, tablet3Mutator, cr, compactions);
EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, tabletsMutator,
tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
}

@Test
Expand Down