diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c225b11bbe55..96d29b20ac71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -2036,80 +2036,86 @@ public BalanceResponse balance(BalanceRequest request) throws IOException { } synchronized (this.balancer) { - // Only allow one balance run at at time. - if (this.assignmentManager.getRegionTransitScheduledCount() > 0) { - List regionsInTransition = assignmentManager.getRegionsInTransition(); - // if hbase:meta region is in transition, result of assignment cannot be recorded - // ignore the force flag in that case - boolean metaInTransition = assignmentManager.isMetaRegionInTransition(); - List toPrint = regionsInTransition; - int max = 5; - boolean truncated = false; - if (regionsInTransition.size() > max) { - toPrint = regionsInTransition.subList(0, max); - truncated = true; - } + try { + this.balancer.onBalancingStart(); + + // Only allow one balance run at at time. + if (this.assignmentManager.getRegionTransitScheduledCount() > 0) { + List regionsInTransition = assignmentManager.getRegionsInTransition(); + // if hbase:meta region is in transition, result of assignment cannot be recorded + // ignore the force flag in that case + boolean metaInTransition = assignmentManager.isMetaRegionInTransition(); + List toPrint = regionsInTransition; + int max = 5; + boolean truncated = false; + if (regionsInTransition.size() > max) { + toPrint = regionsInTransition.subList(0, max); + truncated = true; + } - if (!request.isIgnoreRegionsInTransition() || metaInTransition) { - LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition - + ") because " + assignmentManager.getRegionTransitScheduledCount() - + " region(s) are scheduled to transit " + toPrint - + (truncated ? "(truncated list)" : "")); + if (!request.isIgnoreRegionsInTransition() || metaInTransition) { + LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition + + ") because " + assignmentManager.getRegionTransitScheduledCount() + + " region(s) are scheduled to transit " + toPrint + + (truncated ? "(truncated list)" : "")); + return responseBuilder.build(); + } + } + if (this.serverManager.areDeadServersInProgress()) { + LOG.info("Not running balancer because processing dead regionserver(s): " + + this.serverManager.getDeadServers()); return responseBuilder.build(); } - } - if (this.serverManager.areDeadServersInProgress()) { - LOG.info("Not running balancer because processing dead regionserver(s): " - + this.serverManager.getDeadServers()); - return responseBuilder.build(); - } - if (this.cpHost != null) { - try { - if (this.cpHost.preBalance(request)) { - LOG.debug("Coprocessor bypassing balancer request"); + if (this.cpHost != null) { + try { + if (this.cpHost.preBalance(request)) { + LOG.debug("Coprocessor bypassing balancer request"); + return responseBuilder.build(); + } + } catch (IOException ioe) { + LOG.error("Error invoking master coprocessor preBalance()", ioe); return responseBuilder.build(); } - } catch (IOException ioe) { - LOG.error("Error invoking master coprocessor preBalance()", ioe); - return responseBuilder.build(); } - } - Map>> assignments = - this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager, - this.serverManager.getOnlineServersList()); - for (Map> serverMap : assignments.values()) { - serverMap.keySet().removeAll(this.serverManager.getDrainingServersList()); - } + Map>> assignments = + this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager, + this.serverManager.getOnlineServersList()); + for (Map> serverMap : assignments.values()) { + serverMap.keySet().removeAll(this.serverManager.getDrainingServersList()); + } - // Give the balancer the current cluster state. - this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); + // Give the balancer the current cluster state. + this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); - List plans = this.balancer.balanceCluster(assignments); + List plans = this.balancer.balanceCluster(assignments); - responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ? 0 : plans.size()); + responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ? 0 : plans.size()); - if (skipRegionManagementAction("balancer")) { - // make one last check that the cluster isn't shutting down before proceeding. - return responseBuilder.build(); - } + if (skipRegionManagementAction("balancer")) { + // make one last check that the cluster isn't shutting down before proceeding. + return responseBuilder.build(); + } - // For dry run we don't actually want to execute the moves, but we do want - // to execute the coprocessor below - List sucRPs = - request.isDryRun() ? Collections.emptyList() : executeRegionPlansWithThrottling(plans); + // For dry run we don't actually want to execute the moves, but we do want + // to execute the coprocessor below + List sucRPs = + request.isDryRun() ? Collections.emptyList() : executeRegionPlansWithThrottling(plans); - if (this.cpHost != null) { - try { - this.cpHost.postBalance(request, sucRPs); - } catch (IOException ioe) { - // balancing already succeeded so don't change the result - LOG.error("Error invoking master coprocessor postBalance()", ioe); + if (this.cpHost != null) { + try { + this.cpHost.postBalance(request, sucRPs); + } catch (IOException ioe) { + // balancing already succeeded so don't change the result + LOG.error("Error invoking master coprocessor postBalance()", ioe); + } } - } - responseBuilder.setMovesExecuted(sucRPs.size()); + responseBuilder.setMovesExecuted(sucRPs.size()); + } finally { + this.balancer.onBalancingComplete(); + } } // If LoadBalancer did not generate any plans, it means the cluster is already balanced. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 908e04e20516..0ffe19962266 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -188,6 +188,14 @@ default void throttle(RegionPlan plan) throws Exception { // noop } + default void onBalancingStart() { + // noop + } + + default void onBalancingComplete() { + // noop + } + /** * @return true if Master carries regions * @deprecated since 2.4.0, will be removed in 3.0.0. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java index df0adc98fe68..39486c3b4c35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.RegionMetrics; @@ -64,13 +66,36 @@ public class CacheAwareLoadBalancer extends StochasticLoadBalancer { private Long sleepTime; private Configuration configuration; + /** + * Tracks whether a balance run is currently in progress. + */ + private final AtomicBoolean isBalancing = new AtomicBoolean(false); + + /** + * Holds a configuration update that arrived while a balance run was in progress. + */ + private AtomicReference pendingConfiguration = new AtomicReference<>(); + public enum GeneratorFunctionType { LOAD, CACHE_RATIO } @Override - public synchronized void loadConf(Configuration configuration) { + public void loadConf(Configuration configuration) { + // If balance is running, store configuration in pendingConfiguration and return immediately. + // Defer the config update. + if (isBalancing.get()) { + LOG.debug( + "Balance is in progress, defer applying configuration change until balance completed."); + pendingConfiguration.set(configuration); + } else { + // Apply configuration change immediately. + updateConfiguration(configuration); + } + } + + public void updateConfiguration(Configuration configuration) { this.configuration = configuration; this.costFunctions = new ArrayList<>(); super.loadConf(configuration); @@ -79,6 +104,38 @@ public synchronized void loadConf(Configuration configuration) { sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis()); } + /** + * Sets {@link #isBalancing} to {@code true} before a balance run starts. + */ + @Override + public void onBalancingStart() { + LOG.debug("Setting isBalancing to true as balance is starting"); + isBalancing.set(true); + } + + /** + * Sets {@link #isBalancing} to {@code false} after a balance run completes and applies any + * pending configuration that arrived during balancing. + */ + @Override + public void onBalancingComplete() { + LOG.debug("Setting isBalancing to false as balance is completed"); + isBalancing.set(false); + applyPendingConfiguration(); + } + + /** + * If a pending configuration was stored during a balance run, apply it and clear the pending + * reference. + */ + public void applyPendingConfiguration() { + Configuration toApply = pendingConfiguration.getAndSet(null); + if (toApply != null) { + LOG.info("Applying pending configuration after balance completed."); + updateConfiguration(toApply); + } + } + @Override protected Map, CandidateGenerator> createCandidateGenerators(Configuration conf) { @@ -193,10 +250,13 @@ public void throttle(RegionPlan plan) { + "Throttling move for {}ms.", plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime); } - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw new RuntimeException(e); + synchronized (this) { + try { + // Release the monitor while waiting to avoid blocking other threads. + wait(sleepTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java index 335a719a1f9e..485ae7544436 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java @@ -33,6 +33,10 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; @@ -599,4 +603,151 @@ public void testBalancerNotThrowNPEWhenBalancerPlansIsNull() throws Exception { fail("NPE should not be thrown"); } } + + /** + * This test verifies that when loadConf/onConfigurationChange is called on a + * CacheAwareLoadBalancer while a balance run is in progress, the configuration update: 1. Does + * not block (returns quickly without waiting for balancing to finish) 2. Does not affect the + * ongoing balance run (the configuration used during balancing remains the old one) 3. Is applied + * correctly after the balance run completes + */ + @Test(timeout = 60000) + public void testConfigUpdateDuringBalance() throws Exception { + Float expectedOldRatioThreshold = 0.8f; + Float expectedNewRatioThreshold = 0.95f; + long throttleTimeMs = 10000; + + // Actual old ratio threshold used during balance + float[] actualOldRatioThresholdDuringBalance = new float[1]; + + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); + conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttleTimeMs); + conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedOldRatioThreshold); + + CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer(); + MasterServices services = mock(MasterServices.class); + when(services.getConfiguration()).thenReturn(conf); + balancer.setMasterServices(services); + balancer.loadConf(conf); + balancer.initialize(); + + Map> clusterState = new HashMap<>(); + ServerName server0 = servers.get(0); + ServerName server1 = servers.get(1); + ServerName server2 = servers.get(2); + + // Setup cluster: all 3 regions on server0 (unbalanced) + List regionsOnServer0 = randomRegions(3); + List regionsOnServer1 = randomRegions(0); + List regionsOnServer2 = randomRegions(0); + + clusterState.put(server0, regionsOnServer0); + clusterState.put(server1, regionsOnServer1); + clusterState.put(server2, regionsOnServer2); + + // Mock metrics: NO cache info for any region = all will be throttled + Map serverMetricsMap = new TreeMap<>(); + serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1, + 0.0f, new ArrayList<>(), 0, 10)); + serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2, + 0.0f, new ArrayList<>(), 0, 10)); + + ClusterMetrics clusterMetrics = mock(ClusterMetrics.class); + when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap); + balancer.updateClusterMetrics(clusterMetrics); + + final Map>> loadOfAllTable = + (Map) mockClusterServersWithTables(clusterState); + + // Verify initial configuration + assertEquals(expectedOldRatioThreshold, balancer.ratioThreshold, 0.001f); + + CountDownLatch balanceStarted = new CountDownLatch(1); + CountDownLatch updateConfigInitiated = new CountDownLatch(1); + + long[] configUpdateDuration = new long[1]; + long[] balanceDuration = new long[1]; + + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + // Thread 1 Simulate similar flow to HMaster.balance() which holds synchronized(balancer) for + // the duration of balance + Future balanceFuture = executor.submit(() -> { + try { + long start = EnvironmentEdgeManager.currentTime(); + synchronized (balancer) { + try { + // Simulate beginning of HMaster.balance() mark balancing window open + balancer.onBalancingStart(); + balanceStarted.countDown(); + List plans = balancer.balanceCluster(loadOfAllTable); + + LOG.info("Balance generated {} plans, executing with throttling", + plans != null ? plans.size() : 0); + + if (plans != null) { + for (int i = 0; i < plans.size(); i++) { + RegionPlan plan = plans.get(i); + balancer.throttle(plan); + } + } + // Wait until config update is initiated while balance is still in progress + updateConfigInitiated.await(); + + // Old config should still be visible during current balance run + actualOldRatioThresholdDuringBalance[0] = balancer.ratioThreshold; + } finally { + balancer.onBalancingComplete(); + } + } + return EnvironmentEdgeManager.currentTime() - start; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Thread 2: Simulate update_all_config / onConfigurationChange + Future configUpdateFuture = executor.submit(() -> { + try { + // Wait for balance to start + balanceStarted.await(); + long startTime = EnvironmentEdgeManager.currentTime(); + + // Call onConfigurationChange - should NOT hang + Configuration newConf = HBaseConfiguration.create(); + newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list"); + newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000); + newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD, expectedNewRatioThreshold); + balancer.onConfigurationChange(newConf); + updateConfigInitiated.countDown(); + + return EnvironmentEdgeManager.currentTime() - startTime; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Wait for both threads to complete + configUpdateDuration[0] = configUpdateFuture.get(); + balanceDuration[0] = balanceFuture.get(); + System.out.println("Balance duration (ms): " + balanceDuration[0]); + System.out.println("Config update duration (ms): " + configUpdateDuration[0]); + + // Verify that ratio threshold used during balance is stll the old + assertEquals(expectedOldRatioThreshold, actualOldRatioThresholdDuringBalance[0], 0.001f); + + // Verify that config updated successfully after balance completed + assertEquals(expectedNewRatioThreshold, balancer.ratioThreshold, 0.001f); + + // Verify that config update didn't hang/timeout waiting for balance + assertTrue(configUpdateDuration[0] < balanceDuration[0]); + + } finally { + executor.shutdownNow(); + } + } }