diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index 7541f500e45d0..7a7c7a5ed4429 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -190,6 +190,18 @@ public static TSStatus getStatus(List statusList) { return status; } + public static TSStatus extractFailureStatues(final TSStatus input) { + return input.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ? new TSStatus(input.getCode()) + .setMessage(input.getMessage()) + .setSubStatus( + input.getSubStatus().stream() + .filter( + status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .collect(Collectors.toList())) + : input; + } + /** * Convert from {@link TSStatusCode} to {@link TSStatus}, which has message appended with existing * status message diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java index 6f6d4b162d4d1..e9e257ac254a4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -51,7 +52,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -176,8 +176,6 @@ private boolean alterEncodingCompressorInSchemaRegion(final ConfigNodeProcedureE .setCompressor(compressor) .setEncoding(encoding))) { - private final Map failureMap = new HashMap<>(); - @Override protected List processResponseOfOneDataNode( final TDataNodeLocation dataNodeLocation, @@ -203,7 +201,7 @@ protected List processResponseOfOneDataNode( failedRegionList.addAll(consensusGroupIdList); } if (!failedRegionList.isEmpty()) { - failureMap.put(dataNodeLocation, response); + failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response)); } else { failureMap.remove(dataNodeLocation); } @@ -219,7 +217,7 @@ protected void onAllReplicasetFailure( new MetadataException( String.format( "Alter encoding compressor %s in schema regions failed. Failures: %s", - requestMessage, failureMap)))); + requestMessage, printFailureMap())))); interruptTask(); } }; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java index d600db8207ddd..cc41d941c208b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java @@ -59,6 +59,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; +import java.util.stream.Collectors; public class AlterLogicalViewProcedure extends StateMachineProcedure { @@ -72,6 +73,8 @@ public class AlterLogicalViewProcedure private transient PathPatternTree pathPatternTree; private transient ByteBuffer patternTreeBytes; + protected final Map failureMap = new HashMap<>(); + public AlterLogicalViewProcedure(final boolean isGeneratedByPipe) { super(isGeneratedByPipe); } @@ -390,11 +393,14 @@ protected void onAllReplicasetFailure( new ProcedureException( new MetadataException( String.format( - "Alter view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failure nodes: %s", + "Alter view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failure nodes: %s, statuses: %s", viewPathToSourceMap.keySet(), taskName, consensusGroupId.id, - dataNodeLocationSet)))); + dataNodeLocationSet.stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()), + failureStatusList)))); interruptTask(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java index e31d8a3bbd57c..ccd83014f6433 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java @@ -42,6 +42,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; @@ -54,7 +55,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -195,8 +195,6 @@ private boolean alterTimeSeriesDataType(final ConfigNodeProcedureEnv env) { ByteBuffer.wrap(stream.toByteArray())); })) { - private final Map failureMap = new HashMap<>(); - @Override protected List processResponseOfOneDataNode( final TDataNodeLocation dataNodeLocation, @@ -221,7 +219,7 @@ protected List processResponseOfOneDataNode( failedRegionList.addAll(consensusGroupIdList); } if (!failedRegionList.isEmpty()) { - failureMap.put(dataNodeLocation, response); + failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response)); } else { failureMap.remove(dataNodeLocation); } @@ -240,7 +238,7 @@ protected void onAllReplicasetFailure( measurementPath.getFullPath(), measurementPath.getSeriesType(), dataType, - failureMap)))); + printFailureMap())))); interruptTask(); } }; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java index f632f5f1305fa..104128a042490 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java @@ -22,11 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.rpc.TSStatusCode; import java.util.ArrayList; import java.util.HashMap; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.function.BiFunction; +import java.util.stream.Collectors; import static org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getAllReplicaDataNodeRegionGroupMap; import static org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getLeaderDataNodeRegionGroupMap; @@ -48,7 +51,7 @@ public abstract class DataNodeRegionTaskExecutor { protected final CnToDnAsyncRequestType dataNodeRequestType; protected final BiFunction, Q> dataNodeRequestGenerator; - + protected final Map failureMap = new HashMap<>(); private boolean isInterrupted = false; protected DataNodeRegionTaskExecutor( @@ -229,4 +232,16 @@ protected abstract List processResponseOfOneDataNode( */ protected abstract void onAllReplicasetFailure( final TConsensusGroupId consensusGroupId, final Set dataNodeLocationSet); + + protected String printFailureMap() { + return failureMap.entrySet().stream() + .collect( + Collectors.toMap( + entry -> "DataNodeId: " + entry.getKey().getDataNodeId(), + entry -> + entry.getValue().getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ? entry.getValue().getSubStatus() + : entry.getValue())) + .toString(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeTSStatusTaskExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeTSStatusTaskExecutor.java index 700cb3a9a49ab..37c93b3941d59 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeTSStatusTaskExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeTSStatusTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import java.util.ArrayList; @@ -34,6 +35,7 @@ public abstract class DataNodeTSStatusTaskExecutor extends DataNodeRegionTaskExecutor { + protected List successResult = new ArrayList<>(); protected DataNodeTSStatusTaskExecutor( final ConfigNodeProcedureEnv env, @@ -56,6 +58,7 @@ protected List processResponseOfOneDataNode( final TSStatus response) { final List failedRegionList = new ArrayList<>(); if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureMap.remove(dataNodeLocation); return failedRegionList; } @@ -69,6 +72,42 @@ protected List processResponseOfOneDataNode( } else { failedRegionList.addAll(consensusGroupIdList); } + if (!failedRegionList.isEmpty()) { + failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response)); + } else { + failureMap.remove(dataNodeLocation); + } + return failedRegionList; + } + + protected List processResponseOfOneDataNodeWithSuccessResult( + final TDataNodeLocation dataNodeLocation, + final List consensusGroupIdList, + final TSStatus response) { + final List failedRegionList = new ArrayList<>(); + if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + successResult.add(response); + } else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + List subStatusList = response.getSubStatus(); + for (int i = 0; i < subStatusList.size(); i++) { + if (subStatusList.get(i).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + successResult.add(subStatusList.get(i)); + } else { + failedRegionList.add(consensusGroupIdList.get(i)); + } + } + } else { + failedRegionList.addAll(consensusGroupIdList); + } + if (!failedRegionList.isEmpty()) { + failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response)); + } else { + failureMap.remove(dataNodeLocation); + } return failedRegionList; } + + public List getSuccessResult() { + return successResult; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java index 0e4a17887c9ba..54249754d7e88 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java @@ -143,7 +143,6 @@ private long constructBlackList(ConfigNodeProcedureEnv env) { if (targetSchemaRegionGroup.isEmpty()) { return 0; } - List successResult = new ArrayList<>(); DeactivateTemplateRegionTaskExecutor constructBlackListTask = new DeactivateTemplateRegionTaskExecutor( @@ -156,26 +155,11 @@ private long constructBlackList(ConfigNodeProcedureEnv env) { consensusGroupIdList, dataNodeRequest))) { @Override protected List processResponseOfOneDataNode( - TDataNodeLocation dataNodeLocation, - List consensusGroupIdList, - TSStatus response) { - List failedRegionList = new ArrayList<>(); - if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(response); - } else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - List subStatusList = response.getSubStatus(); - for (int i = 0; i < subStatusList.size(); i++) { - if (subStatusList.get(i).getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(subStatusList.get(i)); - } else { - failedRegionList.add(consensusGroupIdList.get(i)); - } - } - } else { - failedRegionList.addAll(consensusGroupIdList); - } - return failedRegionList; + final TDataNodeLocation dataNodeLocation, + final List consensusGroupIdList, + final TSStatus response) { + return processResponseOfOneDataNodeWithSuccessResult( + dataNodeLocation, consensusGroupIdList, response); } }; constructBlackListTask.execute(); @@ -185,7 +169,7 @@ protected List processResponseOfOneDataNode( } long preDeletedNum = 0; - for (TSStatus resp : successResult) { + for (TSStatus resp : constructBlackListTask.getSuccessResult()) { preDeletedNum += Long.parseLong(resp.getMessage()); } return preDeletedNum; @@ -465,12 +449,12 @@ protected void onAllReplicasetFailure( new ProcedureException( new MetadataException( String.format( - "Deactivate template of %s failed when [%s] because failed to execute in all replicaset of %s %s. Failure nodes: %s", + "Deactivate template of %s failed when [%s] because failed to execute in all replicaset of %s %s. Failure: %s", requestMessage, taskName, consensusGroupId.type, consensusGroupId.id, - dataNodeLocationSet)))); + printFailureMap())))); interruptTask(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java index 1cfead14fcf32..0c68fa62e1884 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java @@ -53,7 +53,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -135,7 +134,6 @@ private long constructBlackList(ConfigNodeProcedureEnv env) { if (targetSchemaRegionGroup.isEmpty()) { return 0; } - List successResult = new ArrayList<>(); DeleteLogicalViewRegionTaskExecutor constructBlackListTask = new DeleteLogicalViewRegionTaskExecutor( "construct view schema engine black list", @@ -146,25 +144,11 @@ private long constructBlackList(ConfigNodeProcedureEnv env) { new TConstructViewSchemaBlackListReq(consensusGroupIdList, patternTreeBytes))) { @Override protected List processResponseOfOneDataNode( - TDataNodeLocation dataNodeLocation, - List consensusGroupIdList, - TSStatus response) { - List failedRegionList = new ArrayList<>(); - if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(response); - } else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - List subStatusList = response.getSubStatus(); - for (int i = 0; i < subStatusList.size(); i++) { - if (subStatusList.get(i).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(subStatusList.get(i)); - } else { - failedRegionList.add(consensusGroupIdList.get(i)); - } - } - } else { - failedRegionList.addAll(consensusGroupIdList); - } - return failedRegionList; + final TDataNodeLocation dataNodeLocation, + final List consensusGroupIdList, + final TSStatus response) { + return processResponseOfOneDataNodeWithSuccessResult( + dataNodeLocation, consensusGroupIdList, response); } }; constructBlackListTask.execute(); @@ -173,7 +157,7 @@ protected List processResponseOfOneDataNode( return 0; } - return successResult.stream() + return constructBlackListTask.getSuccessResult().stream() .mapToLong(resp -> Long.parseLong(resp.getMessage())) .reduce(0L, Long::sum); } @@ -357,8 +341,8 @@ protected void onAllReplicasetFailure( new ProcedureException( new MetadataException( String.format( - "Delete view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failure nodes: %s", - requestMessage, taskName, consensusGroupId.id, dataNodeLocationSet)))); + "Delete view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failures: %s", + requestMessage, taskName, consensusGroupId.id, printFailureMap())))); interruptTask(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index c49f80948fc1d..a8f4b6d53c8a3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -153,7 +154,6 @@ private long constructBlackList(final ConfigNodeProcedureEnv env) { return 0; } isAllLogicalView = true; - final List successResult = new ArrayList<>(); final DeleteTimeSeriesRegionTaskExecutor constructBlackListTask = new DeleteTimeSeriesRegionTaskExecutor( "construct schema engine black list", @@ -167,31 +167,20 @@ protected List processResponseOfOneDataNode( final TDataNodeLocation dataNodeLocation, final List consensusGroupIdList, final TSStatus response) { - final List failedRegionList = new ArrayList<>(); if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { isAllLogicalView = false; - successResult.add(response); } else if (response.getCode() == TSStatusCode.ONLY_LOGICAL_VIEW.getStatusCode()) { successResult.add(response); - } else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - final List subStatusList = response.getSubStatus(); - for (int i = 0; i < subStatusList.size(); i++) { - if (subStatusList.get(i).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(subStatusList.get(i)); - } else { - failedRegionList.add(consensusGroupIdList.get(i)); - } - } - } else { - failedRegionList.addAll(consensusGroupIdList); + return Collections.emptyList(); } - return failedRegionList; + return processResponseOfOneDataNodeWithSuccessResult( + dataNodeLocation, consensusGroupIdList, response); } }; constructBlackListTask.execute(); return !isFailed() - ? successResult.stream() + ? constructBlackListTask.getSuccessResult().stream() .mapToLong(resp -> Long.parseLong(resp.getMessage())) .reduce(Long::sum) .orElse(0L) @@ -446,12 +435,12 @@ protected void onAllReplicasetFailure( new ProcedureException( new MetadataException( String.format( - "Delete time series %s failed when [%s] because failed to execute in all replicaset of %s %s. Failure nodes: %s", + "Delete time series %s failed when [%s] because failed to execute in all replicaset of %s %s. Failures: %s", requestMessage, taskName, consensusGroupId.type, consensusGroupId.id, - dataNodeLocationSet)))); + printFailureMap())))); interruptTask(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java index e2ab08e936663..f5dcae9e73918 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SchemaUtils.java @@ -43,6 +43,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -182,6 +183,7 @@ protected List processResponseOfOneDataNode( respList.add(response); List failedRegionList = new ArrayList<>(); if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureMap.remove(dataNodeLocation); return failedRegionList; } @@ -195,6 +197,12 @@ protected List processResponseOfOneDataNode( } else { failedRegionList.addAll(consensusGroupIdList); } + if (!failedRegionList.isEmpty()) { + failureMap.put( + dataNodeLocation, RpcUtils.extractFailureStatues(response.getStatus())); + } else { + failureMap.remove(dataNodeLocation); + } return failedRegionList; } @@ -204,8 +212,8 @@ protected void onAllReplicasetFailure( exception[0] = new MetadataException( String.format( - "Failed to execute in all replicaset of schemaRegion %s when checking templates on path %s. Failure nodes: %s", - consensusGroupId.id, deleteDatabasePatternPaths, dataNodeLocationSet)); + "Failed to execute in all replicaset of schemaRegion %s when checking templates on path %s. Failures: %s", + consensusGroupId.id, deleteDatabasePatternPaths, printFailureMap())); interruptTask(); } }; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java index 5509037c6fa95..27ccd77b03a39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java @@ -51,6 +51,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq; import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -293,6 +294,7 @@ protected List processResponseOfOneDataNode( respList.add(response); final List failedRegionList = new ArrayList<>(); if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureMap.remove(dataNodeLocation); return failedRegionList; } @@ -308,6 +310,12 @@ protected List processResponseOfOneDataNode( } else { failedRegionList.addAll(consensusGroupIdList); } + if (!failedRegionList.isEmpty()) { + failureMap.put( + dataNodeLocation, RpcUtils.extractFailureStatues(response.getStatus())); + } else { + failureMap.remove(dataNodeLocation); + } return failedRegionList; } @@ -320,11 +328,11 @@ protected void onAllReplicasetFailure( new MetadataException( String.format( "Set template %s to %s failed when [check time series existence on DataNode] because " - + "failed to check time series existence in all replicaset of schemaRegion %s. Failure nodes: %s", + + "failed to check time series existence in all replicaset of schemaRegion %s. Failures: %s", templateName, templateSetPath, consensusGroupId.id, - dataNodeLocationSet)))); + printFailureMap())))); interruptTask(); } }; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DeleteDevicesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DeleteDevicesProcedure.java index c093084e1e0e9..b78b99539ede5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DeleteDevicesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/DeleteDevicesProcedure.java @@ -33,7 +33,7 @@ import org.apache.iotdb.confignode.manager.ClusterManager; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; -import org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionTaskExecutor; +import org.apache.iotdb.confignode.procedure.impl.schema.DataNodeTSStatusTaskExecutor; import org.apache.iotdb.confignode.procedure.state.schema.DeleteDevicesState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -162,64 +162,52 @@ private void constructBlackList(final ConfigNodeProcedureEnv env) { deletedDevicesNum = 0; return; } - final List successResult = new ArrayList<>(); - new DataNodeRegionTaskExecutor( - env, - relatedSchemaRegionGroup, - false, - CnToDnAsyncRequestType.CONSTRUCT_TABLE_DEVICE_BLACK_LIST, - ((dataNodeLocation, consensusGroupIdList) -> - new TTableDeviceDeletionWithPatternAndFilterReq( - new ArrayList<>(consensusGroupIdList), - tableName, - ByteBuffer.wrap(patternBytes), - ByteBuffer.wrap(filterBytes)))) { - @Override - protected List processResponseOfOneDataNode( - final TDataNodeLocation dataNodeLocation, - final List consensusGroupIdList, - final TSStatus response) { - final List failedRegionList = new ArrayList<>(); - if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(response); - } else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - final List subStatusList = response.getSubStatus(); - for (int i = 0; i < subStatusList.size(); i++) { - if (subStatusList.get(i).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successResult.add(subStatusList.get(i)); - } else { - failedRegionList.add(consensusGroupIdList.get(i)); - } - } - } else { - failedRegionList.addAll(consensusGroupIdList); - } - return failedRegionList; - } - - @Override - protected void onAllReplicasetFailure( - final TConsensusGroupId consensusGroupId, - final Set dataNodeLocationSet) { - setFailure( - new ProcedureException( - new MetadataException( - String.format( - "[%s] for %s.%s failed when construct black list for table because failed to execute in all replicaset of %s %s. Failure nodes: %s", - this.getClass().getSimpleName(), - database, + final DataNodeTSStatusTaskExecutor + deleteDevicesExecutor = + new DataNodeTSStatusTaskExecutor( + env, + relatedSchemaRegionGroup, + false, + CnToDnAsyncRequestType.CONSTRUCT_TABLE_DEVICE_BLACK_LIST, + ((dataNodeLocation, consensusGroupIdList) -> + new TTableDeviceDeletionWithPatternAndFilterReq( + new ArrayList<>(consensusGroupIdList), tableName, - consensusGroupId.type, - consensusGroupId.id, - dataNodeLocationSet)))); - interruptTask(); - } - }.execute(); + ByteBuffer.wrap(patternBytes), + ByteBuffer.wrap(filterBytes)))) { + @Override + protected List processResponseOfOneDataNode( + final TDataNodeLocation dataNodeLocation, + final List consensusGroupIdList, + final TSStatus response) { + return processResponseOfOneDataNodeWithSuccessResult( + dataNodeLocation, consensusGroupIdList, response); + } + + @Override + protected void onAllReplicasetFailure( + final TConsensusGroupId consensusGroupId, + final Set dataNodeLocationSet) { + setFailure( + new ProcedureException( + new MetadataException( + String.format( + "[%s] for %s.%s failed when construct black list for table because failed to execute in all replicaset of %s %s. Failures: %s", + this.getClass().getSimpleName(), + database, + tableName, + consensusGroupId.type, + consensusGroupId.id, + printFailureMap())))); + interruptTask(); + } + }; + deleteDevicesExecutor.execute(); setNextState(CONSTRUCT_BLACK_LIST); deletedDevicesNum = !isFailed() - ? successResult.stream() + ? deleteDevicesExecutor.getSuccessResult().stream() .mapToLong(resp -> Long.parseLong(resp.getMessage())) .reduce(Long::sum) .orElse(0L) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java new file mode 100644 index 0000000000000..d1286cc3587d6 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutorTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.utils.StatusUtils; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; + +public class DataNodeRegionTaskExecutorTest { + + @Test + public void testPrintFailureMap() { + final TestRegionTaskExecutor executor = new TestRegionTaskExecutor(); + executor.processResponseOfOneDataNode( + new TDataNodeLocation().setDataNodeId(0), + Collections.singletonList(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3)), + StatusUtils.OK); + executor.processResponseOfOneDataNode( + new TDataNodeLocation().setDataNodeId(0), + Arrays.asList( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), + new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)), + RpcUtils.getStatus( + Arrays.asList( + StatusUtils.OK, new TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode())))); + Assert.assertEquals("{DataNodeId: 0=[TSStatus(code:509)]}", executor.printFailureMap()); + + executor.processResponseOfOneDataNodeWithSuccessResult( + new TDataNodeLocation().setDataNodeId(0), + Arrays.asList( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), + new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)), + RpcUtils.getStatus( + Arrays.asList( + StatusUtils.OK, new TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode())))); + Assert.assertTrue(StatusUtils.OK == executor.getSuccessResult().get(0)); + } + + private static class TestRegionTaskExecutor extends DataNodeTSStatusTaskExecutor { + + private TestRegionTaskExecutor() { + super(new ConfigNodeProcedureEnv(null, null), null, false, null, null); + } + + @Override + protected void onAllReplicasetFailure( + final TConsensusGroupId consensusGroupId, + final Set dataNodeLocationSet) { + interruptTask(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index de92827457a76..261b4908a9061 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.schema.template.Template; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.commons.utils.MetadataUtils; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException; import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -86,6 +87,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -94,7 +97,7 @@ /** Schema write {@link PlanNode} visitor */ public class SchemaExecutionVisitor extends PlanVisitor { - private static final Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class); + private static Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class); @Override public TSStatus visitCreateTimeSeries( @@ -102,7 +105,7 @@ public TSStatus visitCreateTimeSeries( try { schemaRegion.createTimeSeries(node, -1); } catch (final MetadataException e) { - logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + logMetaDataException(String.format("%s: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME), e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -129,7 +132,7 @@ public TSStatus visitCreateAlignedTimeSeries( schemaRegion.createAlignedTimeSeries(node); } } catch (final MetadataException e) { - logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + logMetaDataException(String.format("%s: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME), e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -155,7 +158,8 @@ public TSStatus visitCreateMultiTimeSeries( ((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).setWithMerge(node.isGeneratedByPipe()); schemaRegion.createTimeSeries(createTimeSeriesPlan, -1); } catch (final MetadataException e) { - logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + logMetaDataException( + String.format("%s: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME), e); failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } } @@ -297,7 +301,7 @@ private void executeInternalCreateTimeSeries( RpcUtils.getStatus( e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath()))); } catch (final MetadataException e) { - logger.warn("{}: MetaData error: ", e.getMessage(), e); + logMetaDataException(String.format("%s: MetaData error: ", e.getMessage()), e); failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } } @@ -445,7 +449,7 @@ public TSStatus visitAlterTimeSeries( break; } } catch (MetadataException e) { - logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e); + logMetaDataException(String.format("%s: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME), e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } catch (IOException e) { logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e); @@ -468,7 +472,7 @@ public TSStatus visitActivateTemplate( schemaRegion.activateSchemaTemplate(node, template); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -496,7 +500,7 @@ public TSStatus visitBatchActivateTemplate( if (e.getErrorCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { alreadyActivatedDeviceList.add(entry.getKey()); } else { - logger.error(e.getMessage(), e); + logMetaDataException(e); statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } } @@ -504,7 +508,7 @@ public TSStatus visitBatchActivateTemplate( if (!alreadyActivatedDeviceList.isEmpty()) { final TemplateIsInUseException e = new TemplateIsInUseException(alreadyActivatedDeviceList.toString()); - logger.error(e.getMessage(), e); + logMetaDataException(e); statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } return statusList.isEmpty() ? RpcUtils.SUCCESS_STATUS : RpcUtils.getStatus(statusList); @@ -533,7 +537,7 @@ public TSStatus visitInternalBatchActivateTemplate( "Device Template has already been activated on path %s, there's no need to activate again.", entry.getKey())); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -552,7 +556,7 @@ public TSStatus visitConstructSchemaBlackList( : TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNumAndIsAllLogicalView.getLeft())); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -564,7 +568,7 @@ public TSStatus visitRollbackSchemaBlackList( schemaRegion.rollbackSchemaBlackList(node.getPatternTree()); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -576,7 +580,7 @@ public TSStatus visitDeleteTimeseries( schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree()); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -589,7 +593,7 @@ public TSStatus visitAlterEncodingCompressor( return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { if (e.getErrorCode() != TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { - logger.error(e.getMessage(), e); + logMetaDataException(e); } return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } @@ -603,7 +607,7 @@ public TSStatus visitPreDeactivateTemplate( TSStatusCode.SUCCESS_STATUS, String.valueOf(schemaRegion.constructSchemaBlackListWithTemplate(node))); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -615,7 +619,7 @@ public TSStatus visitRollbackPreDeactivateTemplate( schemaRegion.rollbackSchemaBlackListWithTemplate(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -627,7 +631,7 @@ public TSStatus visitDeactivateTemplate( schemaRegion.deactivateTemplateInBlackList(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -682,7 +686,7 @@ public TSStatus visitConstructLogicalViewBlackList( TSStatusCode.SUCCESS_STATUS, String.valueOf(schemaRegion.constructLogicalViewBlackList(node.getPatternTree()))); } catch (MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -694,7 +698,7 @@ public TSStatus visitRollbackLogicalViewBlackList( schemaRegion.rollbackLogicalViewBlackList(node.getPatternTree()); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -705,7 +709,7 @@ public TSStatus visitDeleteLogicalView(DeleteLogicalViewNode node, ISchemaRegion schemaRegion.deleteLogicalView(node.getPatternTree()); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -718,7 +722,7 @@ public TSStatus visitCreateOrUpdateTableDevice( schemaRegion.createOrUpdateTableDevice(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -730,7 +734,7 @@ public TSStatus visitTableDeviceAttributeUpdate( schemaRegion.updateTableDeviceAttribute(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -742,7 +746,7 @@ public TSStatus visitTableDeviceAttributeCommit( schemaRegion.commitUpdateAttribute(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -754,7 +758,7 @@ public TSStatus visitTableNodeLocationAdd( schemaRegion.addNodeLocation(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -766,7 +770,7 @@ public TSStatus visitDeleteTableDevice( schemaRegion.deleteTableDevice(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -778,7 +782,7 @@ public TSStatus visitTableAttributeColumnDrop( schemaRegion.dropTableAttribute(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -790,7 +794,7 @@ public TSStatus visitConstructTableDevicesBlackList( final long preDeletedNum = schemaRegion.constructTableDevicesBlackList(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNum)); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -802,7 +806,7 @@ public TSStatus visitRollbackTableDevicesBlackList( schemaRegion.rollbackTableDevicesBlackList(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -814,7 +818,7 @@ public TSStatus visitDeleteTableDevicesInBlackList( schemaRegion.deleteTableDevicesInBlackList(node); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (final MetadataException e) { - logger.error(e.getMessage(), e); + logMetaDataException(e); return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } @@ -851,7 +855,29 @@ public TSStatus visitPipeOperateSchemaQueueNode( } @Override - public TSStatus visitPlan(PlanNode node, ISchemaRegion context) { + public TSStatus visitPlan(final PlanNode node, final ISchemaRegion context) { return null; } + + public static void logMetaDataException( + final @Nonnull String message, final @Nonnull MetadataException e) { + if (e.isUserException()) { + logger.info(message); + } else { + logger.error(message, e); + } + } + + public static void logMetaDataException(final @Nonnull MetadataException e) { + if (e.isUserException()) { + logger.info(e.getMessage()); + } else { + logger.error(e.getMessage(), e); + } + } + + @TestOnly + public static void setLogger(final Logger logger) { + SchemaExecutionVisitor.logger = logger; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 7ddf1618223ab..230ed8330ca16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -978,7 +978,7 @@ public void alterEncodingCompressor(final AlterEncodingCompressorNode node) mTree.alterEncodingCompressor(pathPattern, node.getEncoding(), node.getCompressionType()); } if (!exist) { - throw new PathNotExistException(node.getPatternTree().getAllPathPatterns().toString(), false); + throw new PathNotExistException(node.getPatternTree().getAllPathPatterns().toString(), true); } writeToMLog(node); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java new file mode 100644 index 0000000000000..b36f585e11752 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitorTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.consensus.statemachine.schemaregion; + +import org.apache.iotdb.commons.exception.MetadataException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +public class SchemaExecutionVisitorTest { + @Test + public void testAuthLogger() { + SchemaExecutionVisitor.setLogger( + new Logger() { + @Override + public String getName() { + return null; + } + + @Override + public boolean isTraceEnabled() { + return false; + } + + @Override + public void trace(String msg) {} + + @Override + public void trace(String format, Object arg) {} + + @Override + public void trace(String format, Object arg1, Object arg2) {} + + @Override + public void trace(String format, Object... arguments) {} + + @Override + public void trace(String msg, Throwable t) {} + + @Override + public boolean isTraceEnabled(Marker marker) { + return false; + } + + @Override + public void trace(Marker marker, String msg) {} + + @Override + public void trace(Marker marker, String format, Object arg) {} + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void trace(Marker marker, String format, Object... argArray) {} + + @Override + public void trace(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public void debug(String msg) {} + + @Override + public void debug(String format, Object arg) {} + + @Override + public void debug(String format, Object arg1, Object arg2) {} + + @Override + public void debug(String format, Object... arguments) {} + + @Override + public void debug(String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled(Marker marker) { + return false; + } + + @Override + public void debug(Marker marker, String msg) {} + + @Override + public void debug(Marker marker, String format, Object arg) {} + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void debug(Marker marker, String format, Object... arguments) {} + + @Override + public void debug(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled() { + return true; + } + + @Override + public void info(String msg) {} + + @Override + public void info(String format, Object arg) {} + + @Override + public void info(String format, Object arg1, Object arg2) {} + + @Override + public void info(String format, Object... arguments) {} + + @Override + public void info(String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled(Marker marker) { + return true; + } + + @Override + public void info(Marker marker, String msg) {} + + @Override + public void info(Marker marker, String format, Object arg) {} + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void info(Marker marker, String format, Object... arguments) {} + + @Override + public void info(Marker marker, String msg, Throwable t) {} + + // Warn + @Override + public boolean isWarnEnabled() { + return true; + } + + @Override + public void warn(String msg) {} + + @Override + public void warn(String format, Object arg) {} + + @Override + public void warn(String format, Object... arguments) {} + + @Override + public void warn(String format, Object arg1, Object arg2) {} + + @Override + public void warn(String msg, Throwable t) {} + + @Override + public boolean isWarnEnabled(Marker marker) { + return false; + } + + @Override + public void warn(Marker marker, String msg) {} + + @Override + public void warn(Marker marker, String format, Object arg) {} + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void warn(Marker marker, String format, Object... arguments) {} + + @Override + public void warn(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isErrorEnabled() { + return true; + } + + @Override + public void error(String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isErrorEnabled(Marker marker) { + return true; + } + + @Override + public void error(Marker marker, String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(Marker marker, String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(Marker marker, String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void error(Marker marker, String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + }); + SchemaExecutionVisitor.logMetaDataException(new MetadataException("", true)); + SchemaExecutionVisitor.logMetaDataException("", new MetadataException("", true)); + try { + SchemaExecutionVisitor.logMetaDataException(new MetadataException("", false)); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + try { + SchemaExecutionVisitor.logMetaDataException(new MetadataException("", false)); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + SchemaExecutionVisitor.setLogger(LoggerFactory.getLogger(SchemaExecutionVisitor.class)); + } +}