diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index 3a0aaec2aeda8..244a1ff0c83ca 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -89,6 +89,10 @@ public void setUp() { .setDnConnectionTimeoutMs(600000) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); + senderEnv + .getConfig() + .getDataNodeConfig() + .setMetricReporterType(Collections.singletonList("PROMETHEUS")); receiverEnv .getConfig() @@ -102,6 +106,10 @@ public void setUp() { .setDnConnectionTimeoutMs(600000) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); + receiverEnv + .getConfig() + .getDataNodeConfig() + .setMetricReporterType(Collections.singletonList("PROMETHEUS")); senderEnv.initClusterEnvironment(3, 3, 180); receiverEnv.initClusterEnvironment(3, 3, 180); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java index ff06e6531cfcf..48c2f7a030890 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java @@ -19,11 +19,13 @@ package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.RegionRoleType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; @@ -54,18 +56,22 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2DualTreeAutoEnhanced.class}) public class IoTDBPipeClusterIT extends AbstractPipeDualTreeModelAutoIT { + private static final double SYNC_LAG_DELTA = 0.001; + @Override @Before public void setUp() { @@ -82,6 +88,10 @@ public void setUp() { .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); + senderEnv + .getConfig() + .getDataNodeConfig() + .setMetricReporterType(Collections.singletonList("PROMETHEUS")); senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH"); receiverEnv @@ -95,6 +105,10 @@ public void setUp() { .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); + receiverEnv + .getConfig() + .getDataNodeConfig() + .setMetricReporterType(Collections.singletonList("PROMETHEUS")); // 10 min, assert that the operations will not time out senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); @@ -315,40 +329,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)", "flush"), null); - final AtomicInteger leaderPort = new AtomicInteger(-1); - final TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); - showRegionResp - .getRegionInfoList() - .forEach( - regionInfo -> { - if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { - leaderPort.set(regionInfo.getClientRpcPort()); - } - }); - - int leaderIndex = -1; - for (int i = 0; i < 3; ++i) { - if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) { - leaderIndex = i; - try { - senderEnv.shutdownDataNode(i); - } catch (final Throwable e) { - e.printStackTrace(); - return; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (final InterruptedException ignored) { - } - try { - senderEnv.startDataNode(i); - ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown(); - } catch (final Throwable e) { - e.printStackTrace(); - return; - } - } - } + final int leaderIndex = restartTreeDataRegionLeader(client, "root.db"); if (leaderIndex == -1) { // ensure the leader is stopped fail(); } @@ -363,6 +344,7 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { "select count(*) from root.db.d1", "count(root.db.d1.s1),", Collections.singleton("2,")); + waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db")); } try { @@ -411,6 +393,139 @@ public void testPipeAfterDataRegionLeaderStop() throws Exception { } } + private int restartTreeDataRegionLeader( + final SyncConfigNodeIServiceClient client, final String database) throws TException { + final List leaderRegionInfoList = + showTreeDataRegionLeaders(Collections.singletonList(database), client); + if (leaderRegionInfoList.isEmpty()) { + return -1; + } + + final TRegionInfo targetRegionInfo = + leaderRegionInfoList.stream() + .min(Comparator.comparingInt(regionInfo -> regionInfo.getConsensusGroupId().getId())) + .orElse(null); + if (targetRegionInfo == null) { + return -1; + } + + final int leaderPort = targetRegionInfo.getClientRpcPort(); + for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) { + if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) { + continue; + } + + try { + senderEnv.shutdownDataNode(i); + } catch (final Throwable e) { + e.printStackTrace(); + return -1; + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + return -1; + } + + try { + senderEnv.startDataNode(i); + ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown(); + } catch (final Throwable e) { + e.printStackTrace(); + return -1; + } + return i; + } + return -1; + } + + private void waitForTreeDataRegionReplicationComplete(final List databases) { + await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List leaderRegionInfoList = + showTreeDataRegionLeaders(databases, client); + Assert.assertFalse( + "No tree DataRegion leader found for databases " + databases, + leaderRegionInfoList.isEmpty()); + + for (final TRegionInfo regionInfo : leaderRegionInfoList) { + final DataNodeWrapper leaderNode = + findDataNodeWrapperByPort(regionInfo.getClientRpcPort()); + final String metricsUrl = + "http://" + + leaderNode.getIp() + + ":" + + leaderNode.getMetricPort() + + "/metrics"; + final String metricsContent = senderEnv.getUrlContent(metricsUrl, null); + Assert.assertNotNull( + "Failed to fetch metrics from leader DataNode at " + metricsUrl, + metricsContent); + assertSyncLagIsZero(metricsContent, buildDataRegionTag(regionInfo), metricsUrl); + } + } + }); + } + + private List showTreeDataRegionLeaders( + final List databases, final SyncConfigNodeIServiceClient client) throws TException { + final TShowRegionResp showRegionResp = + client.showRegion( + new TShowRegionReq() + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .setDatabases(databases)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode()); + final List result = new ArrayList<>(); + for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + result.add(regionInfo); + } + } + return result; + } + + private DataNodeWrapper findDataNodeWrapperByPort(final int port) { + for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) { + if (dataNodeWrapper.getPort() == port) { + return dataNodeWrapper; + } + } + fail("Failed to find DataNodeWrapper for client rpc port " + port); + return null; + } + + private String buildDataRegionTag(final TRegionInfo regionInfo) { + return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]"; + } + + private void assertSyncLagIsZero( + final String metricsContent, final String dataRegionTag, final String metricsUrl) { + for (final String line : metricsContent.split("\\R")) { + if (!line.startsWith("iot_consensus{") + || !line.contains("type=\"syncLag\"") + || !line.contains("region=\"" + dataRegionTag + "\"")) { + continue; + } + final int lastSpaceIndex = line.lastIndexOf(' '); + Assert.assertTrue("Malformed syncLag metric line: " + line, lastSpaceIndex > 0); + Assert.assertEquals( + "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl + " but got " + line, + 0.0, + Double.parseDouble(line.substring(lastSpaceIndex + 1)), + SYNC_LAG_DELTA); + return; + } + fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl); + } + @Test public void testPipeAfterRegisterNewDataNode() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);