diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 77c7e75c134..6b9728e7970 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -973,14 +973,14 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException try { // Get the CRR via RSEndpoint for cluster 1 ClusterRoleRecord roleRecord = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), - info.getName(), this, pollerInterval, properties); + info.getUrl2(), info.getUrl1(), info.getName(), this, pollerInterval, properties); // If we have unknown role for any cluster then try getting CRR from cluster 2 endpoint and if // we get unknown role from there as well then CRR with higher adminVersion wins. if (roleRecord.hasUnknownRole()) { ClusterRoleRecord roleRecordFromPR; try { - roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), - info.getName(), this, pollerInterval, properties); + roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), + info.getUrl2(), info.getUrl2(), info.getName(), this, pollerInterval, properties); } catch (Exception e) { // As we were able to get CRR from cluster 1 but cluster 2 threw exception then just // return @@ -1009,16 +1009,16 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException == SQLExceptionCode.CLUSTER_ROLE_RECORD_NOT_FOUND.getErrorCode() ) { try { - return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(), - this, pollerInterval, properties); + return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(), + info.getUrl2(), info.getName(), this, pollerInterval, properties); } catch (Exception ignoredEx) { throw (SQLException) e; } } // If caught exception is not CRR not found, then just try cluster 2 endpoint. - return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(), this, - pollerInterval, properties); + return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(), + info.getUrl2(), info.getName(), this, pollerInterval, properties); } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 4e43c72feb3..bb66f7a1c66 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; @@ -52,13 +53,22 @@ public class GetClusterRoleRecordUtil { private static final Logger LOGGER = LoggerFactory.getLogger(GetClusterRoleRecordUtil.class); /** - * Scheduler to fetch ClusterRoleRecord until we get an Active ClusterRoleRecord + * Per-HA-group scheduler executors, keyed on haGroupName so multiple HA groups can run pollers + * independently without sharing or overwriting each other's lifecycle state. */ - private static Map schedulerMap = new ConcurrentHashMap<>(); + private static final Map schedulerMap = + new ConcurrentHashMap<>(); - private static final Object pollerLock = new Object(); + /** + * Per-HA-group poller futures, keyed on haGroupName. A previous implementation kept a single + * static {@code pollerFuture} field that was overwritten by each new {@code schedulePoller} + * call regardless of haGroupName, so cancelling the poller for one HA group would cancel the + * future most recently scheduled (which may belong to a different HA group). Keying by + * haGroupName ensures each group's future is cancelled independently. + */ + private static final Map> futureMap = new ConcurrentHashMap<>(); - private static volatile ScheduledFuture pollerFuture = null; + private static final Object pollerLock = new Object(); private GetClusterRoleRecordUtil() { } @@ -131,48 +141,65 @@ private static ClusterRoleRecord getClusterRoleRecord(String url, String haGroup * until we get an Active ClusterRoleRecord (one role should be Active) if we receive an Active * roleRecord then client this method will return the roleRecord to be consumed and used, if not * then it will start a poller and return non-active roleRecord. - * @param url URL of the RegionServer Endpoint Service + *

+ * The poller alternates between {@code url1} and {@code url2} on successive ticks so a transient + * outage on one cluster does not stall progress; both URLs are passed in even though the initial + * fetch only targets one of them (selected by the caller via the {@code primaryUrl} hint). + * @param url1 URL of the RegionServer Endpoint Service for cluster 1 + * @param url2 URL of the RegionServer Endpoint Service for cluster 2 + * @param primaryUrl URL to use for the initial (non-poller) fetch; must be either url1 or + * url2 * @param haGroupName Name of the HA group - * @param properties Connection properties - * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord * @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an * Active CRR is found + * @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord + * @param properties Connection properties * @throws SQLException if there is an error getting the ClusterRoleRecord */ - public static ClusterRoleRecord fetchClusterRoleRecord(String url, String haGroupName, - HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) throws SQLException { - ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, haGroupName, true, properties); + public static ClusterRoleRecord fetchClusterRoleRecord(String url1, String url2, + String primaryUrl, String haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, + Properties properties) throws SQLException { + ClusterRoleRecord clusterRoleRecord = + getClusterRoleRecord(primaryUrl, haGroupName, true, properties); if ( clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER && !clusterRoleRecord.getRole1().isActive() && !clusterRoleRecord.getRole2().isActive() ) { LOGGER.info( - "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} seconds," - + "until we find an ACTIVE CRR", + "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} ms," + + " alternating between url1 and url2 until we find an ACTIVE CRR", haGroupName, pollerInterval); - // Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or configured value) + // Schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds // until we get an Active ClusterRoleRecord and return the Non-Active CRR - schedulePoller(url, haGroupName, haGroup, pollerInterval, properties); + schedulePoller(url1, url2, haGroupName, haGroup, pollerInterval, properties); } return clusterRoleRecord; } /** - * Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval seconds until we - * get an Active ClusterRoleRecord, poller will only start if client will receive, a Non-Active - * roleRecord (means either of the roles are not Active and client can't create a connection) - * @param url URL of the RegionServer Endpoint Service + * Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds until + * we get an Active ClusterRoleRecord. Poller will only start if client received a Non-Active + * roleRecord (means neither role is Active and client can't create a connection). + *

+ * The poller alternates between {@code url1} and {@code url2} on successive ticks. Alternating + * (rather than pinning to a single URL) avoids stalling if the chosen cluster's RegionServer + * Endpoint is transiently unreachable while the peer cluster is healthy and may already hold + * the Active role. + *

+ * Each haGroupName gets its own entry in {@link #futureMap} and {@link #schedulerMap}. Multiple + * HA groups can therefore run independent pollers; cancellation of one HA group's poller does + * not interfere with another's lifecycle. + * @param url1 URL of the RegionServer Endpoint Service for cluster 1 + * @param url2 URL of the RegionServer Endpoint Service for cluster 2 * @param haGroupName Name of the HA group * @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an * Active CRR is found - * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord + * @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord * @param properties Connection properties - * @throws SQLException if there is an error getting or refreshing the ClusterRoleRecord when an - * Active CRR is found */ - private static void schedulePoller(String url, String haGroupName, HighAvailabilityGroup haGroup, - long pollerInterval, Properties properties) { + private static void schedulePoller(String url1, String url2, String haGroupName, + HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) { synchronized (pollerLock) { if (schedulerMap.containsKey(haGroupName) && !schedulerMap.get(haGroupName).isShutdown()) { @@ -181,34 +208,47 @@ private static void schedulePoller(String url, String haGroupName, HighAvailabil } schedulerMap.put(haGroupName, Executors.newScheduledThreadPool(1)); - LOGGER.info("Starting poller for HA group {} to check every {} milliseconds.", haGroupName, - pollerInterval); + LOGGER.info( + "Starting poller for HA group {} to check every {} milliseconds, alternating between {}" + + " and {}.", + haGroupName, pollerInterval, url1, url2); + AtomicLong tickCount = new AtomicLong(0); Runnable pollingTask = () -> { + // Increment unconditionally so a failed tick still alternates next iteration. + long tick = tickCount.getAndIncrement(); + String tickUrl = selectUrlForTick(url1, url2, tick); try { - ClusterRoleRecord polledCrr = getClusterRoleRecord(url, haGroupName, true, properties); - LOGGER.info("Polled CRR: {}", polledCrr); + ClusterRoleRecord polledCrr = + getClusterRoleRecord(tickUrl, haGroupName, true, properties); + LOGGER.info("Polled CRR for HA group {} via {}: {}", haGroupName, tickUrl, polledCrr); if (polledCrr.getRole1().isActive() || polledCrr.getRole2().isActive()) { - LOGGER.info("Active ClusterRoleRecord found. Cancelling poller."); + LOGGER.info("Active ClusterRoleRecord found for HA group {}. Cancelling poller.", + haGroupName); synchronized (pollerLock) { - if (pollerFuture != null) { - pollerFuture.cancel(false); + ScheduledFuture future = futureMap.remove(haGroupName); + if (future != null) { + future.cancel(false); } // Refresh ClusterRoleRecord for the HAGroup with appropriate transition haGroup.refreshClusterRoleRecord(true); - schedulerMap.get(haGroupName).shutdown(); - schedulerMap.remove(haGroupName); + ScheduledExecutorService scheduler = schedulerMap.remove(haGroupName); + if (scheduler != null) { + scheduler.shutdown(); + } } } } catch (SQLException e) { - LOGGER.error("Exception found while polling for ClusterRoleRecord on {}: {}", url, - e.getMessage()); + LOGGER.error("Exception found while polling for ClusterRoleRecord on {} for HA group" + + " {}: {}", tickUrl, haGroupName, e.getMessage()); } }; - // Schedule the task with a fixed delay - pollerFuture = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, 0, - pollerInterval, TimeUnit.MILLISECONDS); + // Schedule the task with a fixed delay; keyed by haGroupName so each HA group's future is + // independently cancellable. + ScheduledFuture future = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, + 0, pollerInterval, TimeUnit.MILLISECONDS); + futureMap.put(haGroupName, future); } } @@ -241,4 +281,28 @@ private static Connection getConnection(String url, Properties properties) throw propsCopy.remove(PHOENIX_HA_GROUP_ATTR); return DriverManager.getConnection(url, propsCopy); } + + /** + * Pick which URL the poller should target on tick {@code tick}. Even ticks (0, 2, 4, ...) + * select {@code url1}; odd ticks select {@code url2}. Package-private for unit-test access. + */ + static String selectUrlForTick(String url1, String url2, long tick) { + return (tick % 2 == 0) ? url1 : url2; + } + + /** + * Test-only accessor for the per-HA-group future map. Package-private so unit tests can verify + * that distinct HA groups produce distinct entries (and that cancellation removes only the + * corresponding entry). + */ + static Map> getFutureMapForTesting() { + return futureMap; + } + + /** + * Test-only accessor for the per-HA-group scheduler map. Package-private for unit-test use. + */ + static Map getSchedulerMapForTesting() { + return schedulerMap; + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java new file mode 100644 index 00000000000..c8ebe40dbc6 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link GetClusterRoleRecordUtil}. These tests cover two regressions in the + * non-active CRR poller infrastructure: + * + *

    + *
  1. Per-HA-group future tracking: a previous implementation kept a single static + * {@code pollerFuture} field that was overwritten by every {@code schedulePoller} invocation + * regardless of HA group, so cancelling one HA group's poller would target whichever future was + * scheduled most recently — possibly belonging to a different HA group. The fix keys both the + * scheduler executor and the future on {@code haGroupName} via concurrent maps.
  2. + *
  3. URL alternation each tick: a previous implementation pinned each scheduled poller to a + * single URL passed in at schedule time. If that cluster's RegionServer Endpoint became + * transiently unreachable, the poller could never observe the peer cluster's CRR even after + * the peer became Active. The fix has the poller alternate between url1 and url2 on each tick. + *
  4. + *
+ */ +public class GetClusterRoleRecordUtilTest { + + private static final String URL_1 = "phoenix+rpc:cluster1.example.com:2181"; + private static final String URL_2 = "phoenix+rpc:cluster2.example.com:2181"; + private static final String HA_GROUP_A = "haGroupA"; + private static final String HA_GROUP_B = "haGroupB"; + + @Before + public void clearStateBefore() { + GetClusterRoleRecordUtil.getFutureMapForTesting().clear(); + GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear(); + } + + @After + public void clearStateAfter() { + // Ensure nothing leaks across tests; the maps are static class state. + GetClusterRoleRecordUtil.getFutureMapForTesting().clear(); + GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear(); + } + + /** + * URL-alternation core: even ticks pick url1, odd ticks pick url2. This verifies the helper + * that the poller calls each tick to choose its target URL. + */ + @Test + public void testSelectUrlForTickAlternates() { + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 0L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 2L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 3L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 100L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 101L)); + } + + /** + * Defensive: large tick counts should still alternate cleanly. Guards against accidental sign + * issues if a long tick value approaches Long.MAX_VALUE. + */ + @Test + public void testSelectUrlForTickHandlesLargeTickValues() { + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1_000_000L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1_000_001L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, Long.MAX_VALUE - 1)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, Long.MAX_VALUE)); + } + + /** + * Per-HA-group future map: distinct HA group names produce distinct map entries. Verifies the + * data-structure invariant that replaces the prior single-static-{@code pollerFuture} field. + */ + @Test + public void testFutureMapIsolatesEntriesPerHaGroup() { + Map> futureMap = GetClusterRoleRecordUtil.getFutureMapForTesting(); + Map schedulerMap = + GetClusterRoleRecordUtil.getSchedulerMapForTesting(); + assertTrue("futureMap should start empty", futureMap.isEmpty()); + assertTrue("schedulerMap should start empty", schedulerMap.isEmpty()); + + ScheduledFuture futureA = mock(ScheduledFuture.class); + ScheduledFuture futureB = mock(ScheduledFuture.class); + ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class); + ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class); + + futureMap.put(HA_GROUP_A, futureA); + futureMap.put(HA_GROUP_B, futureB); + schedulerMap.put(HA_GROUP_A, schedulerA); + schedulerMap.put(HA_GROUP_B, schedulerB); + + assertEquals(2, futureMap.size()); + assertEquals(2, schedulerMap.size()); + assertNotNull(futureMap.get(HA_GROUP_A)); + assertNotNull(futureMap.get(HA_GROUP_B)); + // Distinct entries — adding HA_GROUP_B did not overwrite HA_GROUP_A's entry. + assertFalse("entries for distinct HA groups must be different references", + futureMap.get(HA_GROUP_A) == futureMap.get(HA_GROUP_B)); + } + + /** + * Removal/cancellation isolation: cancelling one HA group's poller cancels only that group's + * future and does not touch the peer group's future. This is the key behavioural invariant + * the prior single-static field violated. + */ + @Test + public void testCancelOneHaGroupDoesNotCancelOthers() { + Map> futureMap = GetClusterRoleRecordUtil.getFutureMapForTesting(); + Map schedulerMap = + GetClusterRoleRecordUtil.getSchedulerMapForTesting(); + + ScheduledFuture futureA = mock(ScheduledFuture.class); + ScheduledFuture futureB = mock(ScheduledFuture.class); + ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class); + ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class); + when(futureA.cancel(false)).thenReturn(true); + when(futureB.cancel(false)).thenReturn(true); + + futureMap.put(HA_GROUP_A, futureA); + futureMap.put(HA_GROUP_B, futureB); + schedulerMap.put(HA_GROUP_A, schedulerA); + schedulerMap.put(HA_GROUP_B, schedulerB); + + // Mirror the cancel-on-active path inside schedulePoller: remove the entry for HA_GROUP_A, + // cancel its future, shut down its scheduler. HA_GROUP_B's entries must be untouched. + ScheduledFuture removedFuture = futureMap.remove(HA_GROUP_A); + assertNotNull(removedFuture); + removedFuture.cancel(false); + ScheduledExecutorService removedScheduler = schedulerMap.remove(HA_GROUP_A); + assertNotNull(removedScheduler); + removedScheduler.shutdown(); + + verify(futureA, times(1)).cancel(false); + verify(futureB, never()).cancel(false); + verify(schedulerA, times(1)).shutdown(); + verify(schedulerB, never()).shutdown(); + assertNull("HA_GROUP_A entry should be removed", futureMap.get(HA_GROUP_A)); + assertNotNull("HA_GROUP_B entry should remain", futureMap.get(HA_GROUP_B)); + assertEquals(1, futureMap.size()); + assertEquals(1, schedulerMap.size()); + } +}