From 13b6500b31946d9d41eafbdc171022f39a25c544 Mon Sep 17 00:00:00 2001 From: lokiore Date: Thu, 28 May 2026 15:19:31 -0700 Subject: [PATCH] PHOENIX-7870 :- Per-HA-group poller futures and url1/url2 alternation in GetClusterRoleRecordUtil MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug 1 — Per-HA-group future tracking Replaces the single static volatile pollerFuture field (which was overwritten on every schedulePoller call regardless of haGroupName, so cancelling one HA group's poller would target whichever future was scheduled most recently — possibly belonging to a different HA group) with a ConcurrentHashMap> keyed by haGroupName. Symmetric handling for the existing schedulerMap (now also removed from the map on the active-CRR cancel path). Bug 2 — url1/url2 alternation each tick Replaces the single-URL poller (which would stall progress if its target cluster's RegionServer Endpoint became transiently unreachable while the peer cluster held the Active role) with even/odd-tick alternation between url1 and url2. Method signatures updated: fetchClusterRoleRecord and schedulePoller now accept both URLs explicitly. Generated-by: Claude Code (Opus 4.7) --- .../phoenix/jdbc/HighAvailabilityGroup.java | 14 +- .../util/GetClusterRoleRecordUtil.java | 138 ++++++++++---- .../util/GetClusterRoleRecordUtilTest.java | 173 ++++++++++++++++++ 3 files changed, 281 insertions(+), 44 deletions(-) create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 77c7e75c134..6b9728e7970 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -973,14 +973,14 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException try { // Get the CRR via RSEndpoint for cluster 1 ClusterRoleRecord roleRecord = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), - info.getName(), this, pollerInterval, properties); + info.getUrl2(), info.getUrl1(), info.getName(), this, pollerInterval, properties); // If we have unknown role for any cluster then try getting CRR from cluster 2 endpoint and if // we get unknown role from there as well then CRR with higher adminVersion wins. if (roleRecord.hasUnknownRole()) { ClusterRoleRecord roleRecordFromPR; try { - roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), - info.getName(), this, pollerInterval, properties); + roleRecordFromPR = GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), + info.getUrl2(), info.getUrl2(), info.getName(), this, pollerInterval, properties); } catch (Exception e) { // As we were able to get CRR from cluster 1 but cluster 2 threw exception then just // return @@ -1009,16 +1009,16 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException == SQLExceptionCode.CLUSTER_ROLE_RECORD_NOT_FOUND.getErrorCode() ) { try { - return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(), - this, pollerInterval, properties); + return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(), + info.getUrl2(), info.getName(), this, pollerInterval, properties); } catch (Exception ignoredEx) { throw (SQLException) e; } } // If caught exception is not CRR not found, then just try cluster 2 endpoint. - return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(), this, - pollerInterval, properties); + return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(), + info.getUrl2(), info.getName(), this, pollerInterval, properties); } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 4e43c72feb3..bb66f7a1c66 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; @@ -52,13 +53,22 @@ public class GetClusterRoleRecordUtil { private static final Logger LOGGER = LoggerFactory.getLogger(GetClusterRoleRecordUtil.class); /** - * Scheduler to fetch ClusterRoleRecord until we get an Active ClusterRoleRecord + * Per-HA-group scheduler executors, keyed on haGroupName so multiple HA groups can run pollers + * independently without sharing or overwriting each other's lifecycle state. */ - private static Map schedulerMap = new ConcurrentHashMap<>(); + private static final Map schedulerMap = + new ConcurrentHashMap<>(); - private static final Object pollerLock = new Object(); + /** + * Per-HA-group poller futures, keyed on haGroupName. A previous implementation kept a single + * static {@code pollerFuture} field that was overwritten by each new {@code schedulePoller} + * call regardless of haGroupName, so cancelling the poller for one HA group would cancel the + * future most recently scheduled (which may belong to a different HA group). Keying by + * haGroupName ensures each group's future is cancelled independently. + */ + private static final Map> futureMap = new ConcurrentHashMap<>(); - private static volatile ScheduledFuture pollerFuture = null; + private static final Object pollerLock = new Object(); private GetClusterRoleRecordUtil() { } @@ -131,48 +141,65 @@ private static ClusterRoleRecord getClusterRoleRecord(String url, String haGroup * until we get an Active ClusterRoleRecord (one role should be Active) if we receive an Active * roleRecord then client this method will return the roleRecord to be consumed and used, if not * then it will start a poller and return non-active roleRecord. - * @param url URL of the RegionServer Endpoint Service + *

+ * The poller alternates between {@code url1} and {@code url2} on successive ticks so a transient + * outage on one cluster does not stall progress; both URLs are passed in even though the initial + * fetch only targets one of them (selected by the caller via the {@code primaryUrl} hint). + * @param url1 URL of the RegionServer Endpoint Service for cluster 1 + * @param url2 URL of the RegionServer Endpoint Service for cluster 2 + * @param primaryUrl URL to use for the initial (non-poller) fetch; must be either url1 or + * url2 * @param haGroupName Name of the HA group - * @param properties Connection properties - * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord * @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an * Active CRR is found + * @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord + * @param properties Connection properties * @throws SQLException if there is an error getting the ClusterRoleRecord */ - public static ClusterRoleRecord fetchClusterRoleRecord(String url, String haGroupName, - HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) throws SQLException { - ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, haGroupName, true, properties); + public static ClusterRoleRecord fetchClusterRoleRecord(String url1, String url2, + String primaryUrl, String haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, + Properties properties) throws SQLException { + ClusterRoleRecord clusterRoleRecord = + getClusterRoleRecord(primaryUrl, haGroupName, true, properties); if ( clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER && !clusterRoleRecord.getRole1().isActive() && !clusterRoleRecord.getRole2().isActive() ) { LOGGER.info( - "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} seconds," - + "until we find an ACTIVE CRR", + "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} ms," + + " alternating between url1 and url2 until we find an ACTIVE CRR", haGroupName, pollerInterval); - // Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or configured value) + // Schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds // until we get an Active ClusterRoleRecord and return the Non-Active CRR - schedulePoller(url, haGroupName, haGroup, pollerInterval, properties); + schedulePoller(url1, url2, haGroupName, haGroup, pollerInterval, properties); } return clusterRoleRecord; } /** - * Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval seconds until we - * get an Active ClusterRoleRecord, poller will only start if client will receive, a Non-Active - * roleRecord (means either of the roles are not Active and client can't create a connection) - * @param url URL of the RegionServer Endpoint Service + * Method to schedule a poller to fetch ClusterRoleRecord every pollerInterval milliseconds until + * we get an Active ClusterRoleRecord. Poller will only start if client received a Non-Active + * roleRecord (means neither role is Active and client can't create a connection). + *

+ * The poller alternates between {@code url1} and {@code url2} on successive ticks. Alternating + * (rather than pinning to a single URL) avoids stalling if the chosen cluster's RegionServer + * Endpoint is transiently unreachable while the peer cluster is healthy and may already hold + * the Active role. + *

+ * Each haGroupName gets its own entry in {@link #futureMap} and {@link #schedulerMap}. Multiple + * HA groups can therefore run independent pollers; cancellation of one HA group's poller does + * not interfere with another's lifecycle. + * @param url1 URL of the RegionServer Endpoint Service for cluster 1 + * @param url2 URL of the RegionServer Endpoint Service for cluster 2 * @param haGroupName Name of the HA group * @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an * Active CRR is found - * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord + * @param pollerInterval Interval in milliseconds to poll for ClusterRoleRecord * @param properties Connection properties - * @throws SQLException if there is an error getting or refreshing the ClusterRoleRecord when an - * Active CRR is found */ - private static void schedulePoller(String url, String haGroupName, HighAvailabilityGroup haGroup, - long pollerInterval, Properties properties) { + private static void schedulePoller(String url1, String url2, String haGroupName, + HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) { synchronized (pollerLock) { if (schedulerMap.containsKey(haGroupName) && !schedulerMap.get(haGroupName).isShutdown()) { @@ -181,34 +208,47 @@ private static void schedulePoller(String url, String haGroupName, HighAvailabil } schedulerMap.put(haGroupName, Executors.newScheduledThreadPool(1)); - LOGGER.info("Starting poller for HA group {} to check every {} milliseconds.", haGroupName, - pollerInterval); + LOGGER.info( + "Starting poller for HA group {} to check every {} milliseconds, alternating between {}" + + " and {}.", + haGroupName, pollerInterval, url1, url2); + AtomicLong tickCount = new AtomicLong(0); Runnable pollingTask = () -> { + // Increment unconditionally so a failed tick still alternates next iteration. + long tick = tickCount.getAndIncrement(); + String tickUrl = selectUrlForTick(url1, url2, tick); try { - ClusterRoleRecord polledCrr = getClusterRoleRecord(url, haGroupName, true, properties); - LOGGER.info("Polled CRR: {}", polledCrr); + ClusterRoleRecord polledCrr = + getClusterRoleRecord(tickUrl, haGroupName, true, properties); + LOGGER.info("Polled CRR for HA group {} via {}: {}", haGroupName, tickUrl, polledCrr); if (polledCrr.getRole1().isActive() || polledCrr.getRole2().isActive()) { - LOGGER.info("Active ClusterRoleRecord found. Cancelling poller."); + LOGGER.info("Active ClusterRoleRecord found for HA group {}. Cancelling poller.", + haGroupName); synchronized (pollerLock) { - if (pollerFuture != null) { - pollerFuture.cancel(false); + ScheduledFuture future = futureMap.remove(haGroupName); + if (future != null) { + future.cancel(false); } // Refresh ClusterRoleRecord for the HAGroup with appropriate transition haGroup.refreshClusterRoleRecord(true); - schedulerMap.get(haGroupName).shutdown(); - schedulerMap.remove(haGroupName); + ScheduledExecutorService scheduler = schedulerMap.remove(haGroupName); + if (scheduler != null) { + scheduler.shutdown(); + } } } } catch (SQLException e) { - LOGGER.error("Exception found while polling for ClusterRoleRecord on {}: {}", url, - e.getMessage()); + LOGGER.error("Exception found while polling for ClusterRoleRecord on {} for HA group" + + " {}: {}", tickUrl, haGroupName, e.getMessage()); } }; - // Schedule the task with a fixed delay - pollerFuture = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, 0, - pollerInterval, TimeUnit.MILLISECONDS); + // Schedule the task with a fixed delay; keyed by haGroupName so each HA group's future is + // independently cancellable. + ScheduledFuture future = schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, + 0, pollerInterval, TimeUnit.MILLISECONDS); + futureMap.put(haGroupName, future); } } @@ -241,4 +281,28 @@ private static Connection getConnection(String url, Properties properties) throw propsCopy.remove(PHOENIX_HA_GROUP_ATTR); return DriverManager.getConnection(url, propsCopy); } + + /** + * Pick which URL the poller should target on tick {@code tick}. Even ticks (0, 2, 4, ...) + * select {@code url1}; odd ticks select {@code url2}. Package-private for unit-test access. + */ + static String selectUrlForTick(String url1, String url2, long tick) { + return (tick % 2 == 0) ? url1 : url2; + } + + /** + * Test-only accessor for the per-HA-group future map. Package-private so unit tests can verify + * that distinct HA groups produce distinct entries (and that cancellation removes only the + * corresponding entry). + */ + static Map> getFutureMapForTesting() { + return futureMap; + } + + /** + * Test-only accessor for the per-HA-group scheduler map. Package-private for unit-test use. + */ + static Map getSchedulerMapForTesting() { + return schedulerMap; + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java new file mode 100644 index 00000000000..c8ebe40dbc6 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link GetClusterRoleRecordUtil}. These tests cover two regressions in the + * non-active CRR poller infrastructure: + * + *

    + *
  1. Per-HA-group future tracking: a previous implementation kept a single static + * {@code pollerFuture} field that was overwritten by every {@code schedulePoller} invocation + * regardless of HA group, so cancelling one HA group's poller would target whichever future was + * scheduled most recently — possibly belonging to a different HA group. The fix keys both the + * scheduler executor and the future on {@code haGroupName} via concurrent maps.
  2. + *
  3. URL alternation each tick: a previous implementation pinned each scheduled poller to a + * single URL passed in at schedule time. If that cluster's RegionServer Endpoint became + * transiently unreachable, the poller could never observe the peer cluster's CRR even after + * the peer became Active. The fix has the poller alternate between url1 and url2 on each tick. + *
  4. + *
+ */ +public class GetClusterRoleRecordUtilTest { + + private static final String URL_1 = "phoenix+rpc:cluster1.example.com:2181"; + private static final String URL_2 = "phoenix+rpc:cluster2.example.com:2181"; + private static final String HA_GROUP_A = "haGroupA"; + private static final String HA_GROUP_B = "haGroupB"; + + @Before + public void clearStateBefore() { + GetClusterRoleRecordUtil.getFutureMapForTesting().clear(); + GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear(); + } + + @After + public void clearStateAfter() { + // Ensure nothing leaks across tests; the maps are static class state. + GetClusterRoleRecordUtil.getFutureMapForTesting().clear(); + GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear(); + } + + /** + * URL-alternation core: even ticks pick url1, odd ticks pick url2. This verifies the helper + * that the poller calls each tick to choose its target URL. + */ + @Test + public void testSelectUrlForTickAlternates() { + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 0L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 2L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 3L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 100L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 101L)); + } + + /** + * Defensive: large tick counts should still alternate cleanly. Guards against accidental sign + * issues if a long tick value approaches Long.MAX_VALUE. + */ + @Test + public void testSelectUrlForTickHandlesLargeTickValues() { + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1_000_000L)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, 1_000_001L)); + assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, Long.MAX_VALUE - 1)); + assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1, URL_2, Long.MAX_VALUE)); + } + + /** + * Per-HA-group future map: distinct HA group names produce distinct map entries. Verifies the + * data-structure invariant that replaces the prior single-static-{@code pollerFuture} field. + */ + @Test + public void testFutureMapIsolatesEntriesPerHaGroup() { + Map> futureMap = GetClusterRoleRecordUtil.getFutureMapForTesting(); + Map schedulerMap = + GetClusterRoleRecordUtil.getSchedulerMapForTesting(); + assertTrue("futureMap should start empty", futureMap.isEmpty()); + assertTrue("schedulerMap should start empty", schedulerMap.isEmpty()); + + ScheduledFuture futureA = mock(ScheduledFuture.class); + ScheduledFuture futureB = mock(ScheduledFuture.class); + ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class); + ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class); + + futureMap.put(HA_GROUP_A, futureA); + futureMap.put(HA_GROUP_B, futureB); + schedulerMap.put(HA_GROUP_A, schedulerA); + schedulerMap.put(HA_GROUP_B, schedulerB); + + assertEquals(2, futureMap.size()); + assertEquals(2, schedulerMap.size()); + assertNotNull(futureMap.get(HA_GROUP_A)); + assertNotNull(futureMap.get(HA_GROUP_B)); + // Distinct entries — adding HA_GROUP_B did not overwrite HA_GROUP_A's entry. + assertFalse("entries for distinct HA groups must be different references", + futureMap.get(HA_GROUP_A) == futureMap.get(HA_GROUP_B)); + } + + /** + * Removal/cancellation isolation: cancelling one HA group's poller cancels only that group's + * future and does not touch the peer group's future. This is the key behavioural invariant + * the prior single-static field violated. + */ + @Test + public void testCancelOneHaGroupDoesNotCancelOthers() { + Map> futureMap = GetClusterRoleRecordUtil.getFutureMapForTesting(); + Map schedulerMap = + GetClusterRoleRecordUtil.getSchedulerMapForTesting(); + + ScheduledFuture futureA = mock(ScheduledFuture.class); + ScheduledFuture futureB = mock(ScheduledFuture.class); + ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class); + ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class); + when(futureA.cancel(false)).thenReturn(true); + when(futureB.cancel(false)).thenReturn(true); + + futureMap.put(HA_GROUP_A, futureA); + futureMap.put(HA_GROUP_B, futureB); + schedulerMap.put(HA_GROUP_A, schedulerA); + schedulerMap.put(HA_GROUP_B, schedulerB); + + // Mirror the cancel-on-active path inside schedulePoller: remove the entry for HA_GROUP_A, + // cancel its future, shut down its scheduler. HA_GROUP_B's entries must be untouched. + ScheduledFuture removedFuture = futureMap.remove(HA_GROUP_A); + assertNotNull(removedFuture); + removedFuture.cancel(false); + ScheduledExecutorService removedScheduler = schedulerMap.remove(HA_GROUP_A); + assertNotNull(removedScheduler); + removedScheduler.shutdown(); + + verify(futureA, times(1)).cancel(false); + verify(futureB, never()).cancel(false); + verify(schedulerA, times(1)).shutdown(); + verify(schedulerB, never()).shutdown(); + assertNull("HA_GROUP_A entry should be removed", futureMap.get(HA_GROUP_A)); + assertNotNull("HA_GROUP_B entry should remain", futureMap.get(HA_GROUP_B)); + assertEquals(1, futureMap.size()); + assertEquals(1, schedulerMap.size()); + } +}