Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -973,14 +973,14 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException
try {
// Get the CRR via RSEndpoint for cluster 1
ClusterRoleRecord roleRecord = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(),
info.getName(), this, pollerInterval, properties);
info.getUrl2(), info.getUrl1(), info.getName(), this, pollerInterval, properties);
// If we have unknown role for any cluster then try getting CRR from cluster 2 endpoint and if
// we get unknown role from there as well then CRR with higher adminVersion wins.
if (roleRecord.hasUnknownRole()) {
ClusterRoleRecord roleRecordFromPR;
try {
roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(),
info.getName(), this, pollerInterval, properties);
roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(),
info.getUrl2(), info.getUrl2(), info.getName(), this, pollerInterval, properties);
} catch (Exception e) {
// As we were able to get CRR from cluster 1 but cluster 2 threw exception then just
// return
Expand Down Expand Up @@ -1009,16 +1009,16 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException
== SQLExceptionCode.CLUSTER_ROLE_RECORD_NOT_FOUND.getErrorCode()
) {
try {
return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(),
this, pollerInterval, properties);
return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(),
info.getUrl2(), info.getName(), this, pollerInterval, properties);
} catch (Exception ignoredEx) {
throw (SQLException) e;
}
}

// If caught exception is not CRR not found, then just try cluster 2 endpoint.
return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(), this,
pollerInterval, properties);
return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(),
info.getUrl2(), info.getName(), this, pollerInterval, properties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
Expand All @@ -52,13 +53,22 @@ public class GetClusterRoleRecordUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);

/**
* Scheduler to fetch ClusterRoleRecord until we get an Active ClusterRoleRecord
* Per-HA-group scheduler executors, keyed on haGroupName so multiple HA groups can run pollers
* independently without sharing or overwriting each other's lifecycle state.
*/
private static Map<String, ScheduledExecutorService> schedulerMap = new ConcurrentHashMap<>();
private static final Map<String, ScheduledExecutorService> schedulerMap =
new ConcurrentHashMap<>();

private static final Object pollerLock = new Object();
/**
* Per-HA-group poller futures, keyed on haGroupName. A previous implementation kept a single
* static {@code pollerFuture} field that was overwritten by each new {@code schedulePoller}
* call regardless of haGroupName, so cancelling the poller for one HA group would cancel the
* future most recently scheduled (which may belong to a different HA group). Keying by
* haGroupName ensures each group's future is cancelled independently.
*/
private static final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();

private static volatile ScheduledFuture<?> pollerFuture = null;
private static final Object pollerLock = new Object();

private GetClusterRoleRecordUtil() {
}
Expand Down Expand Up @@ -131,48 +141,65 @@ private static ClusterRoleRecord getClusterRoleRecord(String url, String haGroup
* until we get an Active ClusterRoleRecord (one role should be Active) if we receive an Active
* roleRecord then client this method will return the roleRecord to be consumed and used, if not
* then it will start a poller and return non-active roleRecord.
* @param url URL of the RegionServer Endpoint Service
* <p>
* The poller alternates between {@code url1} and {@code url2} on successive ticks so a transient
* outage on one cluster does not stall progress; both URLs are passed in even though the initial
* fetch only targets one of them (selected by the caller via the {@code primaryUrl} hint).
* @param url1 URL of the RegionServer Endpoint Service for cluster 1
* @param url2 URL of the RegionServer Endpoint Service for cluster 2
* @param primaryUrl URL to use for the initial (non-poller) fetch; must be either url1 or
* url2
* @param haGroupName Name of the HA group
* @param properties Connection properties
* @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
* @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an
* Active CRR is found
* @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord
* @param properties Connection properties
* @throws SQLException if there is an error getting the ClusterRoleRecord
*/
public static ClusterRoleRecord fetchClusterRoleRecord(String url, String haGroupName,
HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) throws SQLException {
ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, haGroupName, true, properties);
public static ClusterRoleRecord fetchClusterRoleRecord(String url1, String url2,
String primaryUrl, String haGroupName, HighAvailabilityGroup haGroup, long pollerInterval,
Properties properties) throws SQLException {
ClusterRoleRecord clusterRoleRecord =
getClusterRoleRecord(primaryUrl, haGroupName, true, properties);
if (
clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER
&& !clusterRoleRecord.getRole1().isActive() && !clusterRoleRecord.getRole2().isActive()
) {
LOGGER.info(
"Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} seconds,"
+ "until we find an ACTIVE CRR",
"Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} ms,"
+ " alternating between url1 and url2 until we find an ACTIVE CRR",
haGroupName, pollerInterval);
// Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or configured value)
// Schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds
// until we get an Active ClusterRoleRecord and return the Non-Active CRR
schedulePoller(url, haGroupName, haGroup, pollerInterval, properties);
schedulePoller(url1, url2, haGroupName, haGroup, pollerInterval, properties);
}

return clusterRoleRecord;
}

/**
* Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval seconds until we
* get an Active ClusterRoleRecord, poller will only start if client will receive, a Non-Active
* roleRecord (means either of the roles are not Active and client can't create a connection)
* @param url URL of the RegionServer Endpoint Service
* Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds until
* we get an Active ClusterRoleRecord. Poller will only start if client received a Non-Active
* roleRecord (means neither role is Active and client can't create a connection).
* <p>
* The poller alternates between {@code url1} and {@code url2} on successive ticks. Alternating
* (rather than pinning to a single URL) avoids stalling if the chosen cluster's RegionServer
* Endpoint is transiently unreachable while the peer cluster is healthy and may already hold
* the Active role.
* <p>
* Each haGroupName gets its own entry in {@link #futureMap} and {@link #schedulerMap}. Multiple
* HA groups can therefore run independent pollers; cancellation of one HA group's poller does
* not interfere with another's lifecycle.
* @param url1 URL of the RegionServer Endpoint Service for cluster 1
* @param url2 URL of the RegionServer Endpoint Service for cluster 2
* @param haGroupName Name of the HA group
* @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an
* Active CRR is found
* @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
* @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord
* @param properties Connection properties
* @throws SQLException if there is an error getting or refreshing the ClusterRoleRecord when an
* Active CRR is found
*/
private static void schedulePoller(String url, String haGroupName, HighAvailabilityGroup haGroup,
long pollerInterval, Properties properties) {
private static void schedulePoller(String url1, String url2, String haGroupName,
HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) {

synchronized (pollerLock) {
if (schedulerMap.containsKey(haGroupName) && !schedulerMap.get(haGroupName).isShutdown()) {
Expand All @@ -181,34 +208,47 @@ private static void schedulePoller(String url, String haGroupName, HighAvailabil
}

schedulerMap.put(haGroupName, Executors.newScheduledThreadPool(1));
LOGGER.info("Starting poller for HA group {} to check every {} milliseconds.", haGroupName,
pollerInterval);
LOGGER.info(
"Starting poller for HA group {} to check every {} milliseconds, alternating between {}"
+ " and {}.",
haGroupName, pollerInterval, url1, url2);
AtomicLong tickCount = new AtomicLong(0);
Runnable pollingTask = () -> {
// Increment unconditionally so a failed tick still alternates next iteration.
long tick = tickCount.getAndIncrement();
String tickUrl = selectUrlForTick(url1, url2, tick);
try {
ClusterRoleRecord polledCrr = getClusterRoleRecord(url, haGroupName, true, properties);
LOGGER.info("Polled CRR: {}", polledCrr);
ClusterRoleRecord polledCrr =
getClusterRoleRecord(tickUrl, haGroupName, true, properties);
LOGGER.info("Polled CRR for HA group {} via {}: {}", haGroupName, tickUrl, polledCrr);
if (polledCrr.getRole1().isActive() || polledCrr.getRole2().isActive()) {

LOGGER.info("Active ClusterRoleRecord found. Cancelling poller.");
LOGGER.info("Active ClusterRoleRecord found for HA group {}. Cancelling poller.",
haGroupName);
synchronized (pollerLock) {
if (pollerFuture != null) {
pollerFuture.cancel(false);
ScheduledFuture<?> future = futureMap.remove(haGroupName);
if (future != null) {
future.cancel(false);
}
// Refresh ClusterRoleRecord for the HAGroup with appropriate transition
haGroup.refreshClusterRoleRecord(true);
schedulerMap.get(haGroupName).shutdown();
schedulerMap.remove(haGroupName);
ScheduledExecutorService scheduler = schedulerMap.remove(haGroupName);
if (scheduler != null) {
scheduler.shutdown();
}
}
}
} catch (SQLException e) {
LOGGER.error("Exception found while polling for ClusterRoleRecord on {}: {}", url,
e.getMessage());
LOGGER.error("Exception found while polling for ClusterRoleRecord on {} for HA group"
+ " {}: {}", tickUrl, haGroupName, e.getMessage());
}
};

// Schedule the task with a fixed delay
pollerFuture = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, 0,
pollerInterval, TimeUnit.MILLISECONDS);
// Schedule the task with a fixed delay; keyed by haGroupName so each HA group's future is
// independently cancellable.
ScheduledFuture<?> future = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask,
0, pollerInterval, TimeUnit.MILLISECONDS);
futureMap.put(haGroupName, future);
}
}

Expand Down Expand Up @@ -241,4 +281,28 @@ private static Connection getConnection(String url, Properties properties) throw
propsCopy.remove(PHOENIX_HA_GROUP_ATTR);
return DriverManager.getConnection(url, propsCopy);
}

/**
* Pick which URL the poller should target on tick {@code tick}. Even ticks (0, 2, 4, ...)
* select {@code url1}; odd ticks select {@code url2}. Package-private for unit-test access.
*/
static String selectUrlForTick(String url1, String url2, long tick) {
return (tick % 2 == 0) ? url1 : url2;
}

/**
* Test-only accessor for the per-HA-group future map. Package-private so unit tests can verify
* that distinct HA groups produce distinct entries (and that cancellation removes only the
* corresponding entry).
*/
static Map<String, ScheduledFuture<?>> getFutureMapForTesting() {
return futureMap;
}

/**
* Test-only accessor for the per-HA-group scheduler map. Package-private for unit-test use.
*/
static Map<String, ScheduledExecutorService> getSchedulerMapForTesting() {
return schedulerMap;
}
}
Loading