Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.hadoop.hbase;

import java.io.IOException;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Strings;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ActiveClusterSuffixProtos;

Expand All @@ -31,18 +35,27 @@
*/
@InterfaceAudience.Private
public class ActiveClusterSuffix {
private final String active_cluster_suffix;
private final String cluster_id;
private final String suffix;

/**
* New ActiveClusterSuffix.
*/
public ActiveClusterSuffix(final String ci, final String suffix) {
this.cluster_id = ci;
this.suffix = suffix;
}

public ActiveClusterSuffix(final String cs) {
this.active_cluster_suffix = cs;
public ActiveClusterSuffix(final String input) {
String[] parts = input.split(":", 2);
this.cluster_id = parts[0];
if (parts.length > 1) {
this.suffix = parts[1];
} else {
this.suffix = "";
}
}

public String getActiveClusterSuffix() {
return active_cluster_suffix;
public static ActiveClusterSuffix fromConfig(Configuration conf, ClusterId clusterId) {
return new ActiveClusterSuffix(clusterId.toString(), conf
.get(HConstants.HBASE_META_TABLE_SUFFIX, HConstants.HBASE_META_TABLE_SUFFIX_DEFAULT_VALUE));
}

/** Returns The active cluster suffix serialized using pb w/ pb magic prefix */
Expand Down Expand Up @@ -77,22 +90,34 @@ public static ActiveClusterSuffix parseFrom(final byte[] bytes) throws Deseriali

/** Returns A pb instance to represent this instance. */
public ActiveClusterSuffixProtos.ActiveClusterSuffix convert() {
ActiveClusterSuffixProtos.ActiveClusterSuffix.Builder builder =
ActiveClusterSuffixProtos.ActiveClusterSuffix.newBuilder();
return builder.setActiveClusterSuffix(this.active_cluster_suffix).build();
return ActiveClusterSuffixProtos.ActiveClusterSuffix.newBuilder().setClusterId(cluster_id)
.setSuffix(suffix).build();
}

/** Returns A {@link ActiveClusterSuffix} made from the passed in <code>cs</code> */
public static ActiveClusterSuffix
convert(final ActiveClusterSuffixProtos.ActiveClusterSuffix cs) {
return new ActiveClusterSuffix(cs.getActiveClusterSuffix());
return new ActiveClusterSuffix(cs.getClusterId(), cs.getSuffix());
}

/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return this.active_cluster_suffix;
return String.format("%s:%s", this.cluster_id,
Strings.isNullOrEmpty(this.suffix) ? "<blank>" : this.suffix);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ActiveClusterSuffix that = (ActiveClusterSuffix) o;
return Objects.equals(cluster_id, that.cluster_id) && Objects.equals(suffix, that.suffix);
}

@Override
public int hashCode() {
return Objects.hash(cluster_id, suffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public static TableName getDefaultNameOfMetaForReplica() {
public static TableName initializeHbaseMetaTableName(Configuration conf) {
String suffix_val = conf.get(HConstants.HBASE_META_TABLE_SUFFIX,
HConstants.HBASE_META_TABLE_SUFFIX_DEFAULT_VALUE);
LOG.info("Meta table suffix value: {}", suffix_val);
LOG.debug("[Read-replica feature] suffix value: {}",
(suffix_val == null || suffix_val.isEmpty()) ? "<blank>" : suffix_val);
if (Strings.isNullOrEmpty(suffix_val)) {
return valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "meta");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ option optimize_for = SPEED;
* Content of the '/hbase/active_cluster_suffix.id' file to indicate the active cluster.
*/
message ActiveClusterSuffix {
// This is the active cluster id set by the user in the config, as a String
required string cluster_id = 1;

// This is the active cluster suffix set by the user in the config, as a String
required string active_cluster_suffix = 1;
required string suffix = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -395,31 +393,38 @@ public void logFileSystemState(Logger log) throws IOException {
}

private void negotiateActiveClusterSuffixFile(long wait) throws IOException {
this.activeClusterSuffix = ActiveClusterSuffix.fromConfig(conf, getClusterId());
if (!isReadOnlyModeEnabled(conf)) {
try {
// verify the contents against the config set
ActiveClusterSuffix acs = FSUtils.getActiveClusterSuffix(fs, rootdir);
LOG.debug("File Suffix {} : Configured suffix {} : Cluster ID : {}", acs,
getSuffixFromConfig(), getClusterId());
if (Objects.equals(acs.getActiveClusterSuffix(), getSuffixFromConfig())) {
this.activeClusterSuffix = acs;
} else {
LOG.debug(
"Negotiating active cluster suffix file. File {} : File Suffix {} : Configured suffix {} : Cluster ID : {}",
new Path(rootdir, HConstants.ACTIVE_CLUSTER_SUFFIX_FILE_NAME), acs, activeClusterSuffix,
getClusterId());
// Suffix file exists and we're in read/write mode. Content should match.
if (!this.activeClusterSuffix.equals(acs)) {
// throw error
LOG.info("rootdir {} : Active Cluster File Suffix {} ", rootdir, acs);
LOG.info(
"[Read-replica feature] Another cluster is running in active (read-write) mode on this "
+ "storage location. Active cluster ID: {}, This cluster ID {}. Rootdir location {} ",
acs, activeClusterSuffix, rootdir);
throw new IOException("Cannot start master, because another cluster is running in active "
+ "(read-write) mode on this storage location. Active Cluster Id: {} " + acs
+ " This cluster Id: " + getClusterId());
+ "(read-write) mode on this storage location. Active Cluster Id: " + acs
+ ", This cluster Id: " + activeClusterSuffix);
}
LOG.info(
"This is the active cluster on this storage location, " + "File Suffix {} : Suffix {} : ",
acs, getActiveClusterSuffix());
"[Read-replica feature] This is the active cluster on this storage location with cluster id: {}",
activeClusterSuffix);
} catch (FileNotFoundException fnfe) {
// this is the active cluster, create active cluster suffix file if it does not exist
FSUtils.setActiveClusterSuffix(fs, rootdir, computeAndSetSuffixFileDataToWrite(), wait);
// We're in read/write mode, but suffix file missing, let's create it
FSUtils.setActiveClusterSuffix(fs, rootdir, activeClusterSuffix, wait);
LOG.info("[Read-replica feature] Created Active cluster suffix file: {}, with content: {}",
HConstants.ACTIVE_CLUSTER_SUFFIX_FILE_NAME, activeClusterSuffix);
}
} else {
// this is a replica cluster
LOG.info("Replica cluster is being started in Read Only Mode");
// This is a read-only cluster, don't care about suffix file
LOG.info("[Read-replica feature] Replica cluster is being started in Read Only Mode");
}
}

Expand All @@ -431,25 +436,4 @@ private boolean isReadOnlyModeEnabled(Configuration conf) {
return conf.getBoolean(HConstants.HBASE_GLOBAL_READONLY_ENABLED_KEY,
HConstants.HBASE_GLOBAL_READONLY_ENABLED_DEFAULT);
}

private String getActiveClusterSuffixFromConfig(Configuration conf) {
return conf.get(HConstants.HBASE_META_TABLE_SUFFIX,
HConstants.HBASE_META_TABLE_SUFFIX_DEFAULT_VALUE);
}

public String getSuffixFromConfig() {
return getClusterId().toString() + ":" + getActiveClusterSuffixFromConfig(conf);
}

// Used only for testing
public byte[] getSuffixFileDataToCompare() {
String str = this.activeClusterSuffix.toString();
return str.getBytes(StandardCharsets.UTF_8);
}

public byte[] computeAndSetSuffixFileDataToWrite() {
String str = getClusterId().toString() + ":" + getActiveClusterSuffixFromConfig(conf);
this.activeClusterSuffix = new ActiveClusterSuffix(str);
return str.getBytes(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ActiveClusterSuffix;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -64,9 +64,10 @@ public static void manageActiveClusterIdFile(boolean readOnlyEnabled, MasterFile
LOG.debug("Global read-only mode is being ENABLED. Deleting active cluster file: {}",
activeClusterFile);
try (FSDataInputStream in = fs.open(activeClusterFile)) {
String actualClusterFileData = IOUtils.toString(in, StandardCharsets.UTF_8);
String expectedClusterFileData = mfs.getSuffixFromConfig();
if (actualClusterFileData.equals(expectedClusterFileData)) {
ActiveClusterSuffix actualClusterFileData =
ActiveClusterSuffix.parseFrom(in.readAllBytes());
ActiveClusterSuffix expectedClusterFileData = mfs.getActiveClusterSuffix();
if (expectedClusterFileData.equals(actualClusterFileData)) {
fs.delete(activeClusterFile, false);
LOG.info("Successfully deleted active cluster file: {}", activeClusterFile);
} else {
Expand All @@ -84,13 +85,14 @@ public static void manageActiveClusterIdFile(boolean readOnlyEnabled, MasterFile
"Failed to delete active cluster file: {}. "
+ "Read-only flag will be updated, but file system state is inconsistent.",
activeClusterFile, e);
} catch (DeserializationException e) {
LOG.error("Failed to deserialize ActiveClusterSuffix from file {}", activeClusterFile, e);
}
} else {
// DISABLING READ-ONLY (true -> false), create the active cluster file id file
int wait = mfs.getConfiguration().getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
if (!fs.exists(activeClusterFile)) {
FSUtils.setActiveClusterSuffix(fs, rootDir, mfs.computeAndSetSuffixFileDataToWrite(),
wait);
FSUtils.setActiveClusterSuffix(fs, rootDir, mfs.getActiveClusterSuffix(), wait);
} else {
LOG.debug("Active cluster file already exists at: {}. No need to create it again.",
activeClusterFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -642,19 +641,19 @@ public static ActiveClusterSuffix getActiveClusterSuffix(FileSystem fs, Path roo
data = in.readUTF();
cs = new ActiveClusterSuffix(data);
} catch (EOFException eof) {
LOG.warn("Active Cluster Suffix File {} is empty ", idPath);
LOG.warn("[Read-replica Feature] Active Cluster id file {} is empty ", idPath);
} finally {
in.close();
}
rewriteAsPb(fs, rootdir, idPath, cs);
}
return cs;
} else {
throw new FileNotFoundException("Active Cluster Suffix File " + idPath + " not found");
throw new FileNotFoundException(
"[Read-replica feature] Active Cluster Suffix File " + idPath + " not found");
}
}

/**
* */
private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
final ClusterId cid) throws IOException {
// Rewrite the file as pb. Move aside the old one first, write new
Expand All @@ -668,6 +667,19 @@ private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final P
LOG.debug("Rewrote the hbase.id file as pb");
}

private static void rewriteAsPb(final FileSystem fs, final Path rootdir, final Path p,
final ActiveClusterSuffix cs) throws IOException {
// Rewrite the file as pb. Move aside the old one first, write new
// then delete the moved-aside file.
Path movedAsideName = new Path(p + "." + EnvironmentEdgeManager.currentTime());
if (!fs.rename(p, movedAsideName)) throw new IOException("Failed rename of " + p);
setActiveClusterSuffix(fs, rootdir, cs, 100);
if (!fs.delete(movedAsideName, false)) {
throw new IOException("Failed delete of " + movedAsideName);
}
LOG.debug("Rewrote the active.cluster.suffix.id file as pb");
}

/**
* Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
* directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
Expand All @@ -693,16 +705,15 @@ public static void setClusterId(final FileSystem fs, final Path rootdir,
* HBase root directory. If any operations on the ID file fails, and {@code wait} is a positive
* value, the method will retry to produce the ID file until the thread is forcibly interrupted.
*/

public static void setActiveClusterSuffix(final FileSystem fs, final Path rootdir, byte[] bdata,
final long wait) throws IOException {
public static void setActiveClusterSuffix(final FileSystem fs, final Path rootdir,
final ActiveClusterSuffix cs, final long wait) throws IOException {
final Path idFile = new Path(rootdir, HConstants.ACTIVE_CLUSTER_SUFFIX_FILE_NAME);
final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
final Path tempIdFile = new Path(tempDir, HConstants.ACTIVE_CLUSTER_SUFFIX_FILE_NAME);
String fsuffix = new String(bdata, StandardCharsets.US_ASCII);

LOG.debug("Create Active cluster Suffix file [{}] with Suffix: {}", idFile, fsuffix);
writeClusterInfo(fs, rootdir, idFile, tempIdFile, bdata, wait);
LOG.debug("[Read-replica feature] id file [{}] is present and contains cluster id: {}", idFile,
cs);
writeClusterInfo(fs, rootdir, idFile, tempIdFile, cs.toByteArray(), wait);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ActiveClusterSuffix;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand Down Expand Up @@ -83,12 +85,14 @@ public void testActiveClusterSuffixCreated() throws Exception {
assertTrue(filePath + " should not be empty ", fs.getFileStatus(filePath).getLen() > 0);

MasterFileSystem mfs = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem();
// Compute string using currently set suffix and the cluster id
String cluster_suffix1 =
new String(mfs.getSuffixFileDataToCompare(), StandardCharsets.US_ASCII);
// Compute string member variable
String cluster_suffix2 = mfs.getActiveClusterSuffix().toString();
assertEquals(cluster_suffix1, cluster_suffix2);

try (FSDataInputStream in = fs.open(filePath)) {
ActiveClusterSuffix suffixFromFile = ActiveClusterSuffix.parseFrom(in.readAllBytes());
ActiveClusterSuffix suffixFromConfig =
ActiveClusterSuffix.fromConfig(TEST_UTIL.getConfiguration(), mfs.getClusterId());
assertEquals("Active Cluster Suffix file content doesn't match configuration", suffixFromFile,
suffixFromConfig);
}
}

@Test
Expand All @@ -112,14 +116,15 @@ public void testSuffixFileOnRestart() throws Exception {
}

MasterFileSystem mfs = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem();

// Compute using file contents
String cluster_suffix1 =
new String(mfs.getSuffixFileDataToCompare(), StandardCharsets.US_ASCII);
ActiveClusterSuffix cluster_suffix1 = mfs.getActiveClusterSuffix();
// Compute using config
String cluster_suffix2 = mfs.getSuffixFromConfig();
ActiveClusterSuffix cluster_suffix2 =
ActiveClusterSuffix.fromConfig(TEST_UTIL.getConfiguration(), new ClusterId(clusterId));

assertEquals(cluster_suffix1, cluster_suffix2);
assertEquals(cluster_suffix, cluster_suffix1);
assertEquals(cluster_suffix, cluster_suffix1.toString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
import org.apache.hadoop.hbase.security.access.MasterReadOnlyController;
import org.apache.hadoop.hbase.security.access.RegionReadOnlyController;
import org.apache.hadoop.hbase.security.access.RegionServerReadOnlyController;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ CoprocessorTests.class, SmallTests.class })
public class TestCoprocessorConfigurationUtil {

private Configuration conf;
Expand Down