From 534e4931ba0add083563d3a9c28ce50d7061868f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 14:48:15 +0800 Subject: [PATCH 1/5] Schema --- .../schema/PipeSchemaRegionSinkMetrics.java | 58 ++ .../PipeSchemaRegionWritePlanEventBatch.java | 526 ++++++++++++++++++ .../airgap/IoTDBSchemaRegionAirGapSink.java | 143 ++++- .../thrift/sync/IoTDBSchemaRegionSink.java | 151 ++++- .../PipeSchemaRegionSinkMetricsTest.java | 118 ++++ .../pipe/sink/PipeSchemaRegionSinkTest.java | 317 +++++++++++ ...peSchemaRegionWritePlanEventBatchTest.java | 464 +++++++++++++++ 7 files changed, 1747 insertions(+), 30 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java index c6e8ad5295070..4b2de4e7de0a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -56,6 +57,7 @@ public void bindTo(final AbstractMetricService metricService) { private void createMetrics(final String taskID) { createRate(taskID); + createHistogram(taskID); } private void createRate(final String taskID) { @@ -72,6 +74,38 @@ private void createRate(final String taskID) { String.valueOf(connector.getCreationTime()))); } + private void createHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + + final Histogram schemaBatchSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchSizeHistogram(schemaBatchSizeHistogram); + + final Histogram schemaBatchTimeIntervalHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram); + + final Histogram schemaBatchEventSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString()); + connector.setEventSizeHistogram(schemaBatchEventSizeHistogram); + } + @Override public void unbindFrom(final AbstractMetricService metricService) { ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister); @@ -83,6 +117,7 @@ public void unbindFrom(final AbstractMetricService metricService) { private void removeMetrics(final String taskID) { removeRate(taskID); + removeHistogram(taskID); } private void removeRate(final String taskID) { @@ -98,6 +133,29 @@ private void removeRate(final String taskID) { schemaRateMap.remove(taskID); } + private void removeHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString()); + } + //////////////////////////// Register & deregister (pipe integration) //////////////////////////// public void register(final PipeSinkSubtask pipeSinkSubtask) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java new file mode 100644 index 0000000000000..e03ac6b63efbb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup; +import org.apache.iotdb.metrics.impl.DoNothingHistogram; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY; + +public class PipeSchemaRegionWritePlanEventBatch implements AutoCloseable { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeSchemaRegionWritePlanEventBatch.class); + + private static final PlanNodeId EMPTY_PLAN_NODE_ID = new PlanNodeId(""); + + private final int maxDelayInMs; + private final long maxBatchSizeInBytes; + private final PipeMemoryBlock allocatedMemoryBlock; + + private final List events = new ArrayList<>(); + + private final Map> deviceMap = new HashMap<>(); + private final Map> templateActivationMap = new HashMap<>(); + + private BatchType batchType = BatchType.NONE; + private String pipeName; + private long creationTime; + + private long totalBufferSize = 0; + private long firstEventProcessingTime = Long.MIN_VALUE; + + private volatile boolean isClosed = false; + + private Histogram batchSizeHistogram = new DoNothingHistogram(); + private Histogram batchTimeIntervalHistogram = new DoNothingHistogram(); + private Histogram eventSizeHistogram = new DoNothingHistogram(); + + public PipeSchemaRegionWritePlanEventBatch(final PipeParameters parameters) { + final Integer requestMaxDelayInMillis = + parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, SINK_IOTDB_BATCH_DELAY_MS_KEY); + if (Objects.isNull(requestMaxDelayInMillis)) { + final int requestMaxDelayConfig = + parameters.getIntOrDefault( + Arrays.asList( + CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY, SINK_IOTDB_BATCH_DELAY_SECONDS_KEY), + CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE); + maxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE : requestMaxDelayConfig; + } else { + maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : requestMaxDelayInMillis; + } + + maxBatchSizeInBytes = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), + CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(maxBatchSizeInBytes); + } + + public synchronized boolean onEvent(final PipeSchemaRegionWritePlanEvent event) { + if (isClosed || !canBatch(event)) { + return false; + } + + if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1), event)) { + if (!event.increaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName())) { + LOGGER.warn("Cannot increase reference count for event: {}, ignore it in batch.", event); + return true; + } + + try { + if (Objects.isNull(pipeName)) { + pipeName = event.getPipeName(); + creationTime = event.getCreationTime(); + } + appendPlanNode(event.getPlanNode()); + totalBufferSize += event.getPlanNode().serializeToByteBuffer().limit(); + events.add(event); + } catch (final Exception e) { + event.decreaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName(), false); + throw e; + } + + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } + } + + return true; + } + + private boolean canBatch(final PipeSchemaRegionWritePlanEvent event) { + final BatchType eventBatchType = resolveBatchType(event.getPlanNode()); + if (eventBatchType == BatchType.NONE || containsNonEmptyProps(event.getPlanNode())) { + return false; + } + + if (events.isEmpty()) { + return !hasAlignmentConflict(event.getPlanNode()); + } + + return Objects.equals(pipeName, event.getPipeName()) + && creationTime == event.getCreationTime() + && batchType == eventBatchType + && !hasAlignmentConflict(event.getPlanNode()); + } + + private BatchType resolveBatchType(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + case CREATE_ALIGNED_TIME_SERIES: + case CREATE_MULTI_TIME_SERIES: + case INTERNAL_CREATE_TIME_SERIES: + case INTERNAL_CREATE_MULTI_TIMESERIES: + return BatchType.TIMESERIES; + case ACTIVATE_TEMPLATE: + case BATCH_ACTIVATE_TEMPLATE: + case INTERNAL_BATCH_ACTIVATE_TEMPLATE: + return BatchType.TEMPLATE_ACTIVATE; + default: + return BatchType.NONE; + } + } + + private boolean containsNonEmptyProps(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + return hasNonEmptyProps(((CreateTimeSeriesNode) planNode).getProps()); + case CREATE_MULTI_TIME_SERIES: + return ((CreateMultiTimeSeriesNode) planNode) + .getMeasurementGroupMap().values().stream() + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + case INTERNAL_CREATE_TIME_SERIES: + return hasNonEmptyProps(((InternalCreateTimeSeriesNode) planNode).getMeasurementGroup()); + case INTERNAL_CREATE_MULTI_TIMESERIES: + return ((InternalCreateMultiTimeSeriesNode) planNode) + .getDeviceMap().values().stream() + .map(Pair::getRight) + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + default: + return false; + } + } + + private static boolean hasNonEmptyProps(final MeasurementGroup measurementGroup) { + return Objects.nonNull(measurementGroup.getPropsList()) + && measurementGroup.getPropsList().stream() + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + } + + private static boolean hasNonEmptyProps(final Map props) { + return Objects.nonNull(props) && !props.isEmpty(); + } + + private boolean hasAlignmentConflict(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + return hasAlignmentConflict( + ((CreateTimeSeriesNode) planNode).getPath().getDevicePath(), false); + case CREATE_ALIGNED_TIME_SERIES: + return hasAlignmentConflict(((CreateAlignedTimeSeriesNode) planNode).getDevicePath(), true); + case CREATE_MULTI_TIME_SERIES: + return ((CreateMultiTimeSeriesNode) planNode) + .getMeasurementGroupMap().keySet().stream() + .anyMatch(devicePath -> hasAlignmentConflict(devicePath, false)); + case INTERNAL_CREATE_TIME_SERIES: + return hasAlignmentConflict( + ((InternalCreateTimeSeriesNode) planNode).getDevicePath(), + ((InternalCreateTimeSeriesNode) planNode).isAligned()); + case INTERNAL_CREATE_MULTI_TIMESERIES: + return ((InternalCreateMultiTimeSeriesNode) planNode) + .getDeviceMap().entrySet().stream() + .anyMatch( + entry -> hasAlignmentConflict(entry.getKey(), entry.getValue().getLeft())); + default: + return false; + } + } + + private boolean hasAlignmentConflict(final PartialPath devicePath, final boolean isAligned) { + final Pair existing = deviceMap.get(devicePath); + return Objects.nonNull(existing) && !Objects.equals(existing.getLeft(), isAligned); + } + + private void appendPlanNode(final PlanNode planNode) { + if (batchType == BatchType.NONE) { + batchType = resolveBatchType(planNode); + } + + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + appendCreateTimeSeriesNode((CreateTimeSeriesNode) planNode); + break; + case CREATE_ALIGNED_TIME_SERIES: + appendCreateAlignedTimeSeriesNode((CreateAlignedTimeSeriesNode) planNode); + break; + case CREATE_MULTI_TIME_SERIES: + appendCreateMultiTimeSeriesNode((CreateMultiTimeSeriesNode) planNode); + break; + case INTERNAL_CREATE_TIME_SERIES: + appendInternalCreateTimeSeriesNode((InternalCreateTimeSeriesNode) planNode); + break; + case INTERNAL_CREATE_MULTI_TIMESERIES: + appendInternalCreateMultiTimeSeriesNode((InternalCreateMultiTimeSeriesNode) planNode); + break; + case ACTIVATE_TEMPLATE: + appendActivateTemplateNode((ActivateTemplateNode) planNode); + break; + case BATCH_ACTIVATE_TEMPLATE: + appendBatchActivateTemplateNode((BatchActivateTemplateNode) planNode); + break; + case INTERNAL_BATCH_ACTIVATE_TEMPLATE: + appendInternalBatchActivateTemplateNode((InternalBatchActivateTemplateNode) planNode); + break; + default: + throw new IllegalArgumentException("Unsupported schema plan node " + planNode.getType()); + } + } + + private void appendCreateTimeSeriesNode(final CreateTimeSeriesNode node) { + appendMeasurement( + node.getPath().getDevicePath(), + false, + node.getPath().getMeasurement(), + node.getDataType(), + node.getEncoding(), + node.getCompressor(), + node.getAlias(), + node.getTags(), + node.getAttributes()); + } + + private void appendCreateAlignedTimeSeriesNode(final CreateAlignedTimeSeriesNode node) { + for (int i = 0; i < node.getMeasurements().size(); ++i) { + appendMeasurement( + node.getDevicePath(), + true, + node.getMeasurements().get(i), + node.getDataTypes().get(i), + node.getEncodings().get(i), + node.getCompressors().get(i), + Objects.nonNull(node.getAliasList()) ? node.getAliasList().get(i) : null, + Objects.nonNull(node.getTagsList()) ? node.getTagsList().get(i) : null, + Objects.nonNull(node.getAttributesList()) ? node.getAttributesList().get(i) : null); + } + } + + private void appendCreateMultiTimeSeriesNode(final CreateMultiTimeSeriesNode node) { + node.getMeasurementGroupMap() + .forEach( + (devicePath, measurementGroup) -> + appendMeasurementGroup(devicePath, false, measurementGroup)); + } + + private void appendInternalCreateTimeSeriesNode(final InternalCreateTimeSeriesNode node) { + appendMeasurementGroup(node.getDevicePath(), node.isAligned(), node.getMeasurementGroup()); + } + + private void appendInternalCreateMultiTimeSeriesNode( + final InternalCreateMultiTimeSeriesNode node) { + node.getDeviceMap() + .forEach( + (devicePath, isAlignedAndMeasurementGroup) -> + appendMeasurementGroup( + devicePath, + isAlignedAndMeasurementGroup.getLeft(), + isAlignedAndMeasurementGroup.getRight())); + } + + private void appendMeasurementGroup( + final PartialPath devicePath, + final boolean isAligned, + final MeasurementGroup measurementGroup) { + for (int i = 0; i < measurementGroup.size(); ++i) { + appendMeasurement( + devicePath, + isAligned, + measurementGroup.getMeasurements().get(i), + measurementGroup.getDataTypes().get(i), + measurementGroup.getEncodings().get(i), + measurementGroup.getCompressors().get(i), + Objects.nonNull(measurementGroup.getAliasList()) + ? measurementGroup.getAliasList().get(i) + : null, + Objects.nonNull(measurementGroup.getTagsList()) + ? measurementGroup.getTagsList().get(i) + : null, + Objects.nonNull(measurementGroup.getAttributesList()) + ? measurementGroup.getAttributesList().get(i) + : null); + } + } + + private void appendMeasurement( + final PartialPath devicePath, + final boolean isAligned, + final String measurement, + final TSDataType dataType, + final TSEncoding encoding, + final CompressionType compressor, + final String alias, + final Map tags, + final Map attributes) { + final MeasurementGroup group = + deviceMap + .computeIfAbsent(devicePath, key -> new Pair<>(isAligned, new MeasurementGroup())) + .getRight(); + if (group.addMeasurement(measurement, dataType, encoding, compressor)) { + group.addAlias(alias); + group.addTags(tags); + group.addAttributes(attributes); + } + } + + private void appendActivateTemplateNode(final ActivateTemplateNode node) { + templateActivationMap.putIfAbsent( + node.getActivatePath(), new Pair<>(node.getTemplateId(), node.getTemplateSetLevel())); + } + + private void appendBatchActivateTemplateNode(final BatchActivateTemplateNode node) { + node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent); + } + + private void appendInternalBatchActivateTemplateNode( + final InternalBatchActivateTemplateNode node) { + node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent); + } + + public synchronized boolean shouldEmit() { + if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) { + return false; + } + return totalBufferSize >= maxBatchSizeInBytes + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + } + + public synchronized void recordBatchMetrics() { + if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) { + return; + } + batchTimeIntervalHistogram.update(System.currentTimeMillis() - firstEventProcessingTime); + batchSizeHistogram.update(totalBufferSize); + eventSizeHistogram.update(events.size()); + } + + public synchronized PlanNode toPlanNode() { + switch (batchType) { + case TIMESERIES: + return new InternalCreateMultiTimeSeriesNode(EMPTY_PLAN_NODE_ID, new HashMap<>(deviceMap)); + case TEMPLATE_ACTIVATE: + return new BatchActivateTemplateNode( + EMPTY_PLAN_NODE_ID, new HashMap<>(templateActivationMap)); + default: + throw new IllegalStateException("Cannot build schema batch plan node from empty batch."); + } + } + + public synchronized void onSuccess() { + events.clear(); + deviceMap.clear(); + templateActivationMap.clear(); + batchType = BatchType.NONE; + pipeName = null; + creationTime = 0; + totalBufferSize = 0; + firstEventProcessingTime = Long.MIN_VALUE; + } + + public synchronized void decreaseEventsReferenceCount( + final String holderMessage, final boolean shouldReport) { + events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + final boolean removed = + events.removeIf( + event -> { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { + event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()); + return true; + } + return false; + }); + if (removed) { + rebuildFromEvents(); + } + } + + private void rebuildFromEvents() { + deviceMap.clear(); + templateActivationMap.clear(); + batchType = BatchType.NONE; + pipeName = null; + creationTime = 0; + totalBufferSize = 0; + + if (events.isEmpty()) { + firstEventProcessingTime = Long.MIN_VALUE; + return; + } + + // After a partial discard, the enqueue timestamp of the oldest remaining event is unknown. + // Reset the emit window conservatively to avoid flushing immediately because of removed events. + firstEventProcessingTime = System.currentTimeMillis(); + batchType = resolveBatchType(((PipeSchemaRegionWritePlanEvent) events.get(0)).getPlanNode()); + + for (final EnrichedEvent event : events) { + final PipeSchemaRegionWritePlanEvent schemaEvent = (PipeSchemaRegionWritePlanEvent) event; + if (Objects.isNull(pipeName)) { + pipeName = schemaEvent.getPipeName(); + creationTime = schemaEvent.getCreationTime(); + } + appendPlanNode(schemaEvent.getPlanNode()); + totalBufferSize += schemaEvent.getPlanNode().serializeToByteBuffer().limit(); + } + } + + public synchronized boolean isEmpty() { + return events.isEmpty(); + } + + public synchronized int size() { + return events.size(); + } + + public synchronized String getPipeName() { + return pipeName; + } + + public synchronized long getCreationTime() { + return creationTime; + } + + public void setBatchSizeHistogram(final Histogram batchSizeHistogram) { + if (Objects.nonNull(batchSizeHistogram)) { + this.batchSizeHistogram = batchSizeHistogram; + } + } + + public void setBatchTimeIntervalHistogram(final Histogram batchTimeIntervalHistogram) { + if (Objects.nonNull(batchTimeIntervalHistogram)) { + this.batchTimeIntervalHistogram = batchTimeIntervalHistogram; + } + } + + public void setEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(eventSizeHistogram)) { + this.eventSizeHistogram = eventSizeHistogram; + } + } + + @Override + public synchronized void close() { + isClosed = true; + events.forEach( + event -> event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName())); + events.clear(); + deviceMap.clear(); + templateActivationMap.clear(); + allocatedMemoryBlock.close(); + } + + private enum BatchType { + NONE, + TIMESERIES, + TEMPLATE_ACTIVATE + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index bc056857c17c1..874acdd6de64d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -24,11 +24,15 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -49,6 +53,19 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSchemaRegionAirGapSink.class); + private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch; + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + super.customize(parameters, configuration); + + if (isTabletBatchModeEnabled) { + schemaRegionWritePlanEventBatch = new PipeSchemaRegionWritePlanEventBatch(parameters); + } + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( @@ -68,12 +85,21 @@ public void transfer(final Event event) throws Exception { try { if (event instanceof PipeSchemaRegionWritePlanEvent) { - doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event); + if (isTabletBatchModeEnabled && Objects.nonNull(schemaRegionWritePlanEventBatch)) { + doTransferWithBatch(socket, (PipeSchemaRegionWritePlanEvent) event); + } else { + doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event); + } } else if (event instanceof PipeSchemaRegionSnapshotEvent) { + flushBatchedEventsIfNecessary(socket); doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event); - } else if (!(event instanceof PipeHeartbeatEvent)) { - LOGGER.warn( - "IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", event); + } else { + flushBatchedEventsIfNecessary(socket); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn( + "IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", + event); + } } } catch (final IOException e) { isSocketAlive.set(socketIndex, false); @@ -86,6 +112,43 @@ public void transfer(final Event event) throws Exception { } } + private void doTransferWithBatch( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (schemaRegionWritePlanEventBatch.onEvent(event)) { + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(socket); + } + return; + } + + if (!schemaRegionWritePlanEventBatch.isEmpty()) { + flushBatchedEventsIfNecessary(socket); + if (schemaRegionWritePlanEventBatch.onEvent(event)) { + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(socket); + } + return; + } + } + + doTransferWrapper(socket, event); + } + + private void flushBatchedEventsIfNecessary(final AirGapSocket socket) + throws PipeException, IOException { + if (Objects.isNull(schemaRegionWritePlanEventBatch) + || schemaRegionWritePlanEventBatch.isEmpty()) { + return; + } + + schemaRegionWritePlanEventBatch.recordBatchMetrics(); + doTransfer(socket, schemaRegionWritePlanEventBatch); + schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount( + IoTDBSchemaRegionAirGapSink.class.getName(), true); + schemaRegionWritePlanEventBatch.onSuccess(); + } + private void doTransferWrapper( final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) @@ -107,21 +170,39 @@ private void doTransfer( final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException, IOException { - if (!send( + doTransfer( + socket, + pipeSchemaRegionWritePlanEvent.getPlanNode(), pipeSchemaRegionWritePlanEvent.getPipeName(), pipeSchemaRegionWritePlanEvent.getCreationTime(), - socket, - PipeTransferPlanNodeReq.toTPipeTransferBytes( - pipeSchemaRegionWritePlanEvent.getPlanNode()))) { + pipeSchemaRegionWritePlanEvent.toString()); + } + + private void doTransfer( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEventBatch batch) + throws PipeException, IOException { + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode = + batch.toPlanNode(); + doTransfer(socket, planNode, batch.getPipeName(), batch.getCreationTime(), planNode.toString()); + } + + private void doTransfer( + final AirGapSocket socket, + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode, + final String pipeName, + final long creationTime, + final String eventDescription) + throws PipeException, IOException { + if (!send( + pipeName, creationTime, socket, PipeTransferPlanNodeReq.toTPipeTransferBytes(planNode))) { final String errorMessage = String.format( - "Transfer data node write plan %s error. Socket: %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket); + "Transfer data node write plan %s error. Socket: %s.", planNode.getType(), socket); receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionWritePlanEvent.toString(), + eventDescription, true); } } @@ -215,4 +296,44 @@ protected byte[] getTransferMultiFilePieceBytes( final String fileName, final long position, final byte[] payLoad) throws IOException { return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferBytes(fileName, position, payLoad); } + + @Override + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.discardEventsOfPipe( + pipeNameToDrop, creationTimeToDrop, regionId); + } + } + + @Override + public void close() { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.close(); + } + super.close(); + } + + @Override + public void setTabletBatchSizeHistogram(final Histogram tabletBatchSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram( + final Histogram tabletBatchTimeIntervalHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram( + tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index 15a72d1193aec..d8965dce0dcd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -25,11 +25,15 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -54,6 +58,19 @@ public class IoTDBSchemaRegionSink extends IoTDBDataNodeSyncSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSchemaRegionSink.class); + private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch; + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + super.customize(parameters, configuration); + + if (isTabletBatchModeEnabled) { + schemaRegionWritePlanEventBatch = new PipeSchemaRegionWritePlanEventBatch(parameters); + } + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( @@ -69,13 +86,56 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { if (event instanceof PipeSchemaRegionWritePlanEvent) { - doTransferWrapper((PipeSchemaRegionWritePlanEvent) event); + if (isTabletBatchModeEnabled && Objects.nonNull(schemaRegionWritePlanEventBatch)) { + doTransferWithBatch((PipeSchemaRegionWritePlanEvent) event); + } else { + doTransferWrapper((PipeSchemaRegionWritePlanEvent) event); + } } else if (event instanceof PipeSchemaRegionSnapshotEvent) { + flushBatchedEventsIfNecessary(); doTransferWrapper((PipeSchemaRegionSnapshotEvent) event); - } else if (!(event instanceof PipeHeartbeatEvent)) { - LOGGER.warn( - "IoTDBSchemaRegionConnector does not support transferring generic event: {}.", event); + } else { + flushBatchedEventsIfNecessary(); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn( + "IoTDBSchemaRegionConnector does not support transferring generic event: {}.", event); + } + } + } + + private void doTransferWithBatch(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (schemaRegionWritePlanEventBatch.onEvent(event)) { + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(); + } + return; } + + if (!schemaRegionWritePlanEventBatch.isEmpty()) { + flushBatchedEventsIfNecessary(); + if (schemaRegionWritePlanEventBatch.onEvent(event)) { + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(); + } + return; + } + } + + doTransferWrapper(event); + } + + private void flushBatchedEventsIfNecessary() throws PipeException { + if (Objects.isNull(schemaRegionWritePlanEventBatch) + || schemaRegionWritePlanEventBatch.isEmpty()) { + return; + } + + schemaRegionWritePlanEventBatch.recordBatchMetrics(); + doTransfer(schemaRegionWritePlanEventBatch); + schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount( + IoTDBSchemaRegionSink.class.getName(), true); + schemaRegionWritePlanEventBatch.onSuccess(); } private void doTransferWrapper( @@ -95,43 +155,56 @@ private void doTransferWrapper( private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException { + doTransfer( + pipeSchemaRegionWritePlanEvent.getPlanNode(), + pipeSchemaRegionWritePlanEvent.getPipeName(), + pipeSchemaRegionWritePlanEvent.getCreationTime(), + pipeSchemaRegionWritePlanEvent.getPlanNode().toString()); + LOGGER.info("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent); + } + + private void doTransfer(final PipeSchemaRegionWritePlanEventBatch batch) throws PipeException { + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode = + batch.toPlanNode(); + doTransfer(planNode, batch.getPipeName(), batch.getCreationTime(), planNode.toString()); + LOGGER.info("Successfully transferred batched schema events, batch size {}.", batch.size()); + } + + private void doTransfer( + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode, + final String pipeName, + final long creationTime, + final String eventDescription) + throws PipeException { final Pair clientAndStatus = clientManager.getClient(); final TPipeTransferResp resp; try { final TPipeTransferReq req = - compressIfNeeded( - PipeTransferPlanNodeReq.toTPipeTransferReq( - pipeSchemaRegionWritePlanEvent.getPlanNode())); + compressIfNeeded(PipeTransferPlanNodeReq.toTPipeTransferReq(planNode)); rateLimitIfNeeded( - pipeSchemaRegionWritePlanEvent.getPipeName(), - pipeSchemaRegionWritePlanEvent.getCreationTime(), - clientAndStatus.getLeft().getEndPoint(), - req.getBody().length); + pipeName, creationTime, clientAndStatus.getLeft().getEndPoint(), req.getBody().length); resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( String.format( "Network error when transfer schema region write plan %s, because %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), e.getMessage()), + planNode.getType(), e.getMessage()), e); } final TSStatus status = resp.getStatus(); - // Only handle the failed statuses to avoid string format performance overhead - if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && resp.getStatus().getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { receiverStatusHandler.handle( status, String.format( "Transfer data node write plan %s error, result status %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status), - pipeSchemaRegionWritePlanEvent.getPlanNode().toString(), + planNode.getType(), status), + eventDescription, true); } - - LOGGER.info("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent); } private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) @@ -245,6 +318,46 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq( return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } + @Override + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.discardEventsOfPipe( + pipeNameToDrop, creationTimeToDrop, regionId); + } + } + + @Override + public void close() { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.close(); + } + super.close(); + } + + @Override + public void setTabletBatchSizeHistogram(final Histogram tabletBatchSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram( + final Histogram tabletBatchTimeIntervalHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram( + tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram); + } + } + @Override protected void mayLimitRateAndRecordIO(final long requiredBytes) { // Do nothing diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java new file mode 100644 index 0000000000000..d61fda93be51a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.metric.schema; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.metrics.type.Rate; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipeSchemaRegionSinkMetricsTest { + + @Test + public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exception { + final String taskId = "schema-task-" + System.nanoTime(); + boolean deregistered = false; + final AbstractMetricService metricService = Mockito.mock(AbstractMetricService.class); + final PipeSinkSubtask subtask = Mockito.mock(PipeSinkSubtask.class); + final Rate rate = Mockito.mock(Rate.class); + final Histogram batchSizeHistogram = Mockito.mock(Histogram.class); + final Histogram batchTimeHistogram = Mockito.mock(Histogram.class); + final Histogram eventSizeHistogram = Mockito.mock(Histogram.class); + + when(subtask.getTaskID()).thenReturn(taskId); + when(subtask.getAttributeSortedString()).thenReturn("schema_test"); + when(subtask.getCreationTime()).thenReturn(1L); + when(metricService.getOrCreateRate(anyString(), any(MetricLevel.class), any(String[].class))) + .thenReturn(rate); + when(metricService.getOrCreateHistogram( + anyString(), any(MetricLevel.class), any(String[].class))) + .thenReturn(batchSizeHistogram, batchTimeHistogram, eventSizeHistogram); + + final PipeSchemaRegionSinkMetrics metrics = PipeSchemaRegionSinkMetrics.getInstance(); + + final Field metricServiceField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("metricService"); + metricServiceField.setAccessible(true); + metricServiceField.set(metrics, null); + + try { + metrics.register(subtask); + metrics.bindTo(metricService); + + verify(subtask).setTabletBatchSizeHistogram(batchSizeHistogram); + verify(subtask).setTabletBatchTimeIntervalHistogram(batchTimeHistogram); + verify(subtask).setEventSizeHistogram(eventSizeHistogram); + + metrics.deregister(taskId); + + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + Tag.NAME.toString(), + "schema_test"); + verify(metricService) + .remove( + MetricType.RATE, + Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + deregistered = true; + } finally { + if (!deregistered) { + metrics.deregister(taskId); + } + metricServiceField.set(metrics, null); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java new file mode 100644 index 0000000000000..d6caae2503af7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; +import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink; +import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashMap; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipeSchemaRegionSinkTest { + + @Test + public void testSyncSinkFlushesBatchedEventsOnHeartbeat() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent event = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + + sink.transfer(event); + Assert.assertFalse(event.isReleased()); + verify(client, never()).pipeTransfer(any(TPipeTransferReq.class)); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(event.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testSyncSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent batchedEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent standaloneEvent = + createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L); + + sink.transfer(batchedEvent); + sink.transfer(standaloneEvent); + + verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(batchedEvent.isReleased()); + Assert.assertTrue(standaloneEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent event = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + + sink.transfer(event); + Assert.assertFalse(event.isReleased()); + Assert.assertEquals(0, sink.getSendCount()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + Assert.assertEquals(1, sink.getSendCount()); + Assert.assertTrue(event.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testAirGapSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent batchedEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent standaloneEvent = + createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L); + + sink.transfer(batchedEvent); + sink.transfer(standaloneEvent); + + Assert.assertEquals(2, sink.getSendCount()); + Assert.assertTrue(batchedEvent.isReleased()); + Assert.assertTrue(standaloneEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + private PipeParameters createParameters() { + return new PipeParameters( + new HashMap() { + { + put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "100000"); + put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576"); + } + }); + } + + private void enableBatching(final Object sink) throws Exception { + setField(sink, "isTabletBatchModeEnabled", true); + setField( + sink, + "schemaRegionWritePlanEventBatch", + new PipeSchemaRegionWritePlanEventBatch(createParameters())); + } + + private PipeSchemaRegionWritePlanEventBatch getBatch(final Object sink) throws Exception { + return (PipeSchemaRegionWritePlanEventBatch) + getFieldValue(sink, "schemaRegionWritePlanEventBatch"); + } + + private PipeSchemaRegionWritePlanEvent createBatchableEvent( + final String path, final String pipeName, final long creationTime) throws Exception { + return new PipeSchemaRegionWritePlanEvent( + new CreateTimeSeriesNode( + new org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path), + new org.apache.iotdb.commons.path.MeasurementPath(path), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + pipeName, + creationTime, + null, + null, + null, + null, + null, + null, + true, + false); + } + + private PipeSchemaRegionWritePlanEvent createNonBatchableEvent( + final String path, final String pipeName, final long creationTime) throws Exception { + return new PipeSchemaRegionWritePlanEvent( + new CreateTimeSeriesNode( + new org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path + "-p"), + new org.apache.iotdb.commons.path.MeasurementPath(path), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + java.util.Collections.singletonMap("prop", "v1"), + null, + null, + null), + pipeName, + creationTime, + null, + null, + null, + null, + null, + null, + true, + false); + } + + private TPipeTransferResp createSuccessResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("success")); + return resp; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + Field field = findField(target.getClass(), fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private Object getFieldValue(final Object target, final String fieldName) throws Exception { + Field field = findField(target.getClass(), fieldName); + field.setAccessible(true); + return field.get(target); + } + + private Field findField(final Class clazz, final String fieldName) + throws NoSuchFieldException { + Class current = clazz; + while (current != null) { + try { + return current.getDeclaredField(fieldName); + } catch (NoSuchFieldException ignored) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } + + private static class TestIoTDBSchemaRegionSyncSink extends IoTDBSchemaRegionSink { + + @Override + public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) { + return req; + } + + @Override + public void rateLimitIfNeeded( + final String pipeName, + final long creationTime, + final TEndPoint endPoint, + final long bytesLength) { + // Do nothing in tests. + } + } + + private static class TestIoTDBSchemaRegionAirGapSink extends IoTDBSchemaRegionAirGapSink { + + private int sendCount = 0; + + private TestIoTDBSchemaRegionAirGapSink() { + sockets.add(new AirGapSocket("127.0.0.1", 6667)); + isSocketAlive.add(true); + } + + @Override + protected int nextSocketIndex() { + return 0; + } + + @Override + protected boolean send( + final String pipeName, + final long creationTime, + final AirGapSocket socket, + final byte[] bytes) { + sendCount++; + return true; + } + + private int getSendCount() { + return sendCount; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java new file mode 100644 index 0000000000000..2f31fdb41f796 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PipeSchemaRegionWritePlanEventBatchTest { + + @Test + public void testBatchTimeSeriesEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + Collections.singletonMap("tag", "v1"), + Collections.singletonMap("attr", "a1"), + "alias1"), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateAlignedTimeSeriesNode( + new PlanNodeId("2"), + new PartialPath("root.db.d2"), + Arrays.asList("s1", "s2"), + Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE), + Arrays.asList(TSEncoding.RLE, TSEncoding.GORILLA), + Arrays.asList(CompressionType.SNAPPY, CompressionType.ZSTD), + Arrays.asList("alias2", null), + Arrays.asList(Collections.singletonMap("tag", "v2"), null), + Arrays.asList(Collections.singletonMap("attr", "a2"), null)), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode); + + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(2, deviceMap.size()); + + final Pair d1Group = deviceMap.get(new PartialPath("root.db.d1")); + Assert.assertNotNull(d1Group); + Assert.assertFalse(d1Group.getLeft()); + Assert.assertEquals(Collections.singletonList("s1"), d1Group.getRight().getMeasurements()); + Assert.assertEquals(Collections.singletonList("alias1"), d1Group.getRight().getAliasList()); + Assert.assertEquals( + Collections.singletonList(Collections.singletonMap("tag", "v1")), + d1Group.getRight().getTagsList()); + Assert.assertEquals( + Collections.singletonList(Collections.singletonMap("attr", "a1")), + d1Group.getRight().getAttributesList()); + + final Pair d2Group = deviceMap.get(new PartialPath("root.db.d2")); + Assert.assertNotNull(d2Group); + Assert.assertTrue(d2Group.getLeft()); + Assert.assertEquals(Arrays.asList("s1", "s2"), d2Group.getRight().getMeasurements()); + Assert.assertEquals(Arrays.asList("alias2", null), d2Group.getRight().getAliasList()); + } + } + + @Test + public void testBatchAdditionalTimeSeriesNodeTypes() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + final Map createMultiMap = new HashMap<>(); + createMultiMap.put(new PartialPath("root.db.d1"), createMeasurementGroup("s1", "alias1")); + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateMultiTimeSeriesNode(new PlanNodeId("1"), createMultiMap), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalCreateTimeSeriesNode( + new PlanNodeId("2"), + new PartialPath("root.db.d2"), + createMeasurementGroup("s2", "alias2"), + true), + "pipeA", + 1L))); + + final Map> internalCreateMultiMap = + new HashMap<>(); + internalCreateMultiMap.put( + new PartialPath("root.db.d3"), new Pair<>(false, createMeasurementGroup("s3", "alias3"))); + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalCreateMultiTimeSeriesNode( + new PlanNodeId("3"), internalCreateMultiMap), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode); + + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(3, deviceMap.size()); + Assert.assertFalse(deviceMap.get(new PartialPath("root.db.d1")).getLeft()); + Assert.assertTrue(deviceMap.get(new PartialPath("root.db.d2")).getLeft()); + Assert.assertFalse(deviceMap.get(new PartialPath("root.db.d3")).getLeft()); + Assert.assertEquals( + Collections.singletonList("s2"), + deviceMap.get(new PartialPath("root.db.d2")).getRight().getMeasurements()); + Assert.assertEquals( + Collections.singletonList("alias3"), + deviceMap.get(new PartialPath("root.db.d3")).getRight().getAliasList()); + } + } + + @Test + public void testBatchTemplateActivationEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertTrue( + batch.onEvent( + createEvent( + new ActivateTemplateNode( + new PlanNodeId("1"), new PartialPath("root.db.d1"), 1, 10), + "pipeA", + 1L))); + + final Map> templateActivationMap = new HashMap<>(); + templateActivationMap.put(new PartialPath("root.db.d2"), new Pair<>(2, 20)); + Assert.assertTrue( + batch.onEvent( + createEvent( + new BatchActivateTemplateNode(new PlanNodeId("2"), templateActivationMap), + "pipeA", + 1L))); + + final Map> internalTemplateActivationMap = + new HashMap<>(); + internalTemplateActivationMap.put(new PartialPath("root.db.d3"), new Pair<>(3, 30)); + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalBatchActivateTemplateNode( + new PlanNodeId("3"), internalTemplateActivationMap), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof BatchActivateTemplateNode); + + final Map> batchedMap = + ((BatchActivateTemplateNode) planNode).getTemplateActivationMap(); + Assert.assertEquals(3, batchedMap.size()); + Assert.assertEquals(new Pair<>(10, 1), batchedMap.get(new PartialPath("root.db.d1"))); + Assert.assertEquals(new Pair<>(2, 20), batchedMap.get(new PartialPath("root.db.d2"))); + Assert.assertEquals(new Pair<>(3, 30), batchedMap.get(new PartialPath("root.db.d3"))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("4"), + new MeasurementPath("root.db.d4.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L))); + } + } + + @Test + public void testRejectDifferentPipePropsAndAlignmentConflict() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + Collections.singletonMap("prop", "v1"), + null, + null, + null), + "pipeA", + 1L))); + + final Map measurementGroupMapWithProps = new HashMap<>(); + measurementGroupMapWithProps.put( + new PartialPath("root.db.d2"), createMeasurementGroupWithProps("s1")); + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateMultiTimeSeriesNode(new PlanNodeId("2"), measurementGroupMapWithProps), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("3"), + new MeasurementPath("root.db.d3.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateAlignedTimeSeriesNode( + new PlanNodeId("4"), + new PartialPath("root.db.d3"), + Collections.singletonList("s2"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(TSEncoding.RLE), + Collections.singletonList(CompressionType.SNAPPY), + null, + null, + null), + "pipeA", + 1L))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("5"), + new MeasurementPath("root.db.d4.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeB", + 1L))); + } + } + + @Test + public void testDiscardEventsOfPipeRebuildsBatchAndResetsEmitWindow() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(10000, 1048576))) { + final PipeSchemaRegionWritePlanEvent removedEvent = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L, + 1); + final PipeSchemaRegionWritePlanEvent remainingEvent = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("2"), + new MeasurementPath("root.db.d2.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L, + 2); + + Assert.assertTrue(batch.onEvent(removedEvent)); + Assert.assertTrue(batch.onEvent(remainingEvent)); + + setField(batch, "firstEventProcessingTime", System.currentTimeMillis() - 20000); + Assert.assertTrue(batch.shouldEmit()); + + batch.discardEventsOfPipe("pipeA", 1L, 1); + + Assert.assertTrue(removedEvent.isReleased()); + Assert.assertFalse(remainingEvent.isReleased()); + Assert.assertEquals(1, batch.size()); + Assert.assertEquals("pipeA", batch.getPipeName()); + Assert.assertEquals(1L, batch.getCreationTime()); + Assert.assertFalse(batch.shouldEmit()); + + final PlanNode planNode = batch.toPlanNode(); + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(1, deviceMap.size()); + Assert.assertTrue(deviceMap.containsKey(new PartialPath("root.db.d2"))); + } + } + + @Test + public void testRecordMetricsAndCloseReleaseEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + final Histogram batchSizeHistogram = Mockito.mock(Histogram.class); + final Histogram batchTimeIntervalHistogram = Mockito.mock(Histogram.class); + final Histogram eventSizeHistogram = Mockito.mock(Histogram.class); + batch.setBatchSizeHistogram(batchSizeHistogram); + batch.setBatchTimeIntervalHistogram(batchTimeIntervalHistogram); + batch.setEventSizeHistogram(eventSizeHistogram); + + final PipeSchemaRegionWritePlanEvent event = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L); + + Assert.assertTrue(batch.onEvent(event)); + batch.recordBatchMetrics(); + + verify(batchTimeIntervalHistogram, times(1)).update(anyLong()); + verify(batchSizeHistogram, times(1)).update(anyLong()); + verify(eventSizeHistogram, times(1)).update(1L); + + batch.close(); + Assert.assertTrue(event.isReleased()); + } + } + + private PipeParameters createParameters(final int delayInMs, final long batchSizeInBytes) { + return new PipeParameters( + new HashMap() { + { + put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, String.valueOf(delayInMs)); + put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, String.valueOf(batchSizeInBytes)); + } + }); + } + + private MeasurementGroup createMeasurementGroup(final String measurement, final String alias) { + final MeasurementGroup measurementGroup = new MeasurementGroup(); + measurementGroup.addMeasurement( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4); + measurementGroup.addAlias(alias); + measurementGroup.addTags(Collections.singletonMap("tag", alias)); + measurementGroup.addAttributes(Collections.singletonMap("attr", alias)); + return measurementGroup; + } + + private MeasurementGroup createMeasurementGroupWithProps(final String measurement) { + final MeasurementGroup measurementGroup = new MeasurementGroup(); + measurementGroup.addMeasurement( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4); + measurementGroup.addProps(Collections.singletonMap("prop", "v1")); + return measurementGroup; + } + + private PipeSchemaRegionWritePlanEvent createEvent( + final PlanNode planNode, final String pipeName, final long creationTime) { + return createEvent(planNode, pipeName, creationTime, -1); + } + + private PipeSchemaRegionWritePlanEvent createEvent( + final PlanNode planNode, final String pipeName, final long creationTime, final int regionId) { + final PipeSchemaRegionWritePlanEvent event = + new PipeSchemaRegionWritePlanEvent( + planNode, pipeName, creationTime, null, null, null, null, null, null, true, false); + event.setCommitterKeyAndCommitId(new CommitterKey(pipeName, creationTime, regionId, -1), 1L); + return event; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} From 4b7eb11ff7f087174869ccceb2cabe5811e74429 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 15:39:15 +0800 Subject: [PATCH 2/5] fix --- .../pipe/metric/schema/PipeSchemaRegionSinkMetrics.java | 8 ++++---- .../metric/schema/PipeSchemaRegionSinkMetricsTest.java | 4 ++-- .../apache/iotdb/commons/service/metric/enums/Metric.java | 2 ++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java index 4b2de4e7de0a0..3148a7c67ac0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java @@ -79,7 +79,7 @@ private void createHistogram(final String taskID) { final Histogram schemaBatchSizeHistogram = metricService.getOrCreateHistogram( - Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), connector.getAttributeSortedString(), @@ -89,7 +89,7 @@ private void createHistogram(final String taskID) { final Histogram schemaBatchTimeIntervalHistogram = metricService.getOrCreateHistogram( - Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), connector.getAttributeSortedString(), @@ -137,14 +137,14 @@ private void removeHistogram(final String taskID) { final PipeSinkSubtask connector = connectorMap.get(taskID); metricService.remove( MetricType.HISTOGRAM, - Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), Tag.NAME.toString(), connector.getAttributeSortedString(), Tag.CREATION_TIME.toString(), String.valueOf(connector.getCreationTime())); metricService.remove( MetricType.HISTOGRAM, - Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), Tag.NAME.toString(), connector.getAttributeSortedString(), Tag.CREATION_TIME.toString(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java index d61fda93be51a..26d77c328f27c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java @@ -80,7 +80,7 @@ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exceptio verify(metricService) .remove( MetricType.HISTOGRAM, - Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), Tag.NAME.toString(), "schema_test", Tag.CREATION_TIME.toString(), @@ -88,7 +88,7 @@ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exceptio verify(metricService) .remove( MetricType.HISTOGRAM, - Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), Tag.NAME.toString(), "schema_test", Tag.CREATION_TIME.toString(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index ec16c181e618b..3305191b5d3ad 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -152,8 +152,10 @@ public enum Metric { UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"), UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"), PIPE_INSERT_NODE_BATCH_SIZE("pipe_insert_node_batch_size"), + PIPE_SCHEMA_BATCH_SIZE("pipe_schema_batch_size"), PIPE_TSFILE_BATCH_SIZE("pipe_tsfile_batch_size"), PIPE_INSERT_NODE_BATCH_TIME_COST("pipe_insert_node_batch_time_cost"), + PIPE_SCHEMA_BATCH_TIME_COST("pipe_schema_batch_time_cost"), PIPE_TSFILE_BATCH_TIME_COST("pipe_tsfile_batch_time_cost"), PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"), PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"), From 57a5db94fa80b2640daea108e30e1660beb32ebe Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 16:59:38 +0800 Subject: [PATCH 3/5] fix --- .../airgap/IoTDBSchemaRegionAirGapSink.java | 41 +++++++---- .../thrift/sync/IoTDBSchemaRegionSink.java | 39 +++++++---- .../pipe/sink/PipeSchemaRegionSinkTest.java | 69 +++++++++++++++++++ 3 files changed, 123 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index 874acdd6de64d..d15a913b5e050 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -115,24 +115,39 @@ public void transfer(final Event event) throws Exception { private void doTransferWithBatch( final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) throws PipeException, IOException { - if (schemaRegionWritePlanEventBatch.onEvent(event)) { - if (schemaRegionWritePlanEventBatch.shouldEmit()) { - flushBatchedEventsIfNecessary(socket); - } + if (tryTransferInBatch(socket, event)) { return; } - if (!schemaRegionWritePlanEventBatch.isEmpty()) { - flushBatchedEventsIfNecessary(socket); - if (schemaRegionWritePlanEventBatch.onEvent(event)) { - if (schemaRegionWritePlanEventBatch.shouldEmit()) { - flushBatchedEventsIfNecessary(socket); - } - return; - } + doTransferWrapper(socket, event); + } + + private boolean tryTransferInBatch( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (tryAppendToBatchAndFlushIfNecessary(socket, event)) { + return true; } - doTransferWrapper(socket, event); + if (schemaRegionWritePlanEventBatch.isEmpty()) { + return false; + } + + flushBatchedEventsIfNecessary(socket); + return tryAppendToBatchAndFlushIfNecessary(socket, event); + } + + private boolean tryAppendToBatchAndFlushIfNecessary( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (!schemaRegionWritePlanEventBatch.onEvent(event)) { + return false; + } + + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(socket); + } + return true; } private void flushBatchedEventsIfNecessary(final AirGapSocket socket) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index d8965dce0dcd9..26c1464c183f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -105,24 +105,37 @@ public void transfer(final Event event) throws Exception { private void doTransferWithBatch(final PipeSchemaRegionWritePlanEvent event) throws PipeException { - if (schemaRegionWritePlanEventBatch.onEvent(event)) { - if (schemaRegionWritePlanEventBatch.shouldEmit()) { - flushBatchedEventsIfNecessary(); - } + if (tryTransferInBatch(event)) { return; } - if (!schemaRegionWritePlanEventBatch.isEmpty()) { - flushBatchedEventsIfNecessary(); - if (schemaRegionWritePlanEventBatch.onEvent(event)) { - if (schemaRegionWritePlanEventBatch.shouldEmit()) { - flushBatchedEventsIfNecessary(); - } - return; - } + doTransferWrapper(event); + } + + private boolean tryTransferInBatch(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (tryAppendToBatchAndFlushIfNecessary(event)) { + return true; } - doTransferWrapper(event); + if (schemaRegionWritePlanEventBatch.isEmpty()) { + return false; + } + + flushBatchedEventsIfNecessary(); + return tryAppendToBatchAndFlushIfNecessary(event); + } + + private boolean tryAppendToBatchAndFlushIfNecessary(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (!schemaRegionWritePlanEventBatch.onEvent(event)) { + return false; + } + + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(); + } + return true; } private void flushBatchedEventsIfNecessary() throws PipeException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java index d6caae2503af7..586b13b7667d2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java @@ -118,6 +118,45 @@ public void testSyncSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws E } } + @Test + public void testSyncSinkRetriesBatchingAfterFlushingIncompatibleBatch() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent firstEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent secondEvent = + createBatchableEvent("root.db.d2.s1", "pipeB", 1L); + + sink.transfer(firstEvent); + sink.transfer(secondEvent); + + verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(firstEvent.isReleased()); + Assert.assertFalse(secondEvent.isReleased()); + Assert.assertEquals(1, getBatch(sink).size()); + Assert.assertEquals("pipeB", getBatch(sink).getPipeName()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(secondEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + @Test public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception { final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); @@ -141,6 +180,36 @@ public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception { } } + @Test + public void testAirGapSinkRetriesBatchingAfterFlushingIncompatibleBatch() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent firstEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent secondEvent = + createBatchableEvent("root.db.d2.s1", "pipeB", 1L); + + sink.transfer(firstEvent); + sink.transfer(secondEvent); + + Assert.assertEquals(1, sink.getSendCount()); + Assert.assertTrue(firstEvent.isReleased()); + Assert.assertFalse(secondEvent.isReleased()); + Assert.assertEquals(1, getBatch(sink).size()); + Assert.assertEquals("pipeB", getBatch(sink).getPipeName()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + Assert.assertEquals(2, sink.getSendCount()); + Assert.assertTrue(secondEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + @Test public void testAirGapSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws Exception { final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); From 428c6a76cab546dfb12d04eb644055c475810fa9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 17:32:32 +0800 Subject: [PATCH 4/5] Update PipeSchemaRegionSinkMetricsTest.java --- .../PipeSchemaRegionSinkMetricsTest.java | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java index 26d77c328f27c..33c4c19081c25 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java @@ -32,9 +32,9 @@ import org.mockito.Mockito; import java.lang.reflect.Field; +import java.util.Map; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -54,17 +54,55 @@ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exceptio when(subtask.getTaskID()).thenReturn(taskId); when(subtask.getAttributeSortedString()).thenReturn("schema_test"); when(subtask.getCreationTime()).thenReturn(1L); - when(metricService.getOrCreateRate(anyString(), any(MetricLevel.class), any(String[].class))) + when( + metricService.getOrCreateRate( + eq(Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) .thenReturn(rate); - when(metricService.getOrCreateHistogram( - anyString(), any(MetricLevel.class), any(String[].class))) - .thenReturn(batchSizeHistogram, batchTimeHistogram, eventSizeHistogram); + when( + metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) + .thenReturn(batchSizeHistogram); + when( + metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) + .thenReturn(batchTimeHistogram); + when( + metricService.getOrCreateHistogram( + eq(Metric.PIPE_CONNECTOR_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"))) + .thenReturn(eventSizeHistogram); final PipeSchemaRegionSinkMetrics metrics = PipeSchemaRegionSinkMetrics.getInstance(); final Field metricServiceField = PipeSchemaRegionSinkMetrics.class.getDeclaredField("metricService"); metricServiceField.setAccessible(true); + final Field connectorMapField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("connectorMap"); + connectorMapField.setAccessible(true); + final Field schemaRateMapField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("schemaRateMap"); + schemaRateMapField.setAccessible(true); + + ((Map) connectorMapField.get(metrics)).clear(); + ((Map) schemaRateMapField.get(metrics)).clear(); metricServiceField.set(metrics, null); try { @@ -112,6 +150,8 @@ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exceptio if (!deregistered) { metrics.deregister(taskId); } + ((Map) connectorMapField.get(metrics)).clear(); + ((Map) schemaRateMapField.get(metrics)).clear(); metricServiceField.set(metrics, null); } } From 8bc6625a0cf78d8a6ba41a9183675e35c0e18ccd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 17:33:16 +0800 Subject: [PATCH 5/5] spt --- .../PipeSchemaRegionSinkMetricsTest.java | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java index 33c4c19081c25..22a5a59662f32 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java @@ -54,39 +54,35 @@ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exceptio when(subtask.getTaskID()).thenReturn(taskId); when(subtask.getAttributeSortedString()).thenReturn("schema_test"); when(subtask.getCreationTime()).thenReturn(1L); - when( - metricService.getOrCreateRate( - eq(Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString()), - eq(MetricLevel.IMPORTANT), - eq(Tag.NAME.toString()), - eq("schema_test"), - eq(Tag.CREATION_TIME.toString()), - eq("1"))) + when(metricService.getOrCreateRate( + eq(Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) .thenReturn(rate); - when( - metricService.getOrCreateHistogram( - eq(Metric.PIPE_SCHEMA_BATCH_SIZE.toString()), - eq(MetricLevel.IMPORTANT), - eq(Tag.NAME.toString()), - eq("schema_test"), - eq(Tag.CREATION_TIME.toString()), - eq("1"))) + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) .thenReturn(batchSizeHistogram); - when( - metricService.getOrCreateHistogram( - eq(Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString()), - eq(MetricLevel.IMPORTANT), - eq(Tag.NAME.toString()), - eq("schema_test"), - eq(Tag.CREATION_TIME.toString()), - eq("1"))) + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) .thenReturn(batchTimeHistogram); - when( - metricService.getOrCreateHistogram( - eq(Metric.PIPE_CONNECTOR_BATCH_SIZE.toString()), - eq(MetricLevel.IMPORTANT), - eq(Tag.NAME.toString()), - eq("schema_test"))) + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_CONNECTOR_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"))) .thenReturn(eventSizeHistogram); final PipeSchemaRegionSinkMetrics metrics = PipeSchemaRegionSinkMetrics.getInstance();