Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,24 @@
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {

temporaryMeta.markDataNodeCompleted(nodeId);
LOGGER.info(
"Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",

Check warning on line 161 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 154).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJNAnQ8skf4BPsgd&open=AZ4-aJNAnQ8skf4BPsgd&pullRequest=17717
nodeId,
staticMeta.getPipeName(),
pipeHeartbeat.getRemainingEventCount(staticMeta),
pipeHeartbeat.getRemainingTime(staticMeta),
temporaryMeta.getCompletedDataNodeIds());

final Set<Integer> uncompletedDataNodeIds =
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
LOGGER.info(
"All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",

Check warning on line 173 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 140).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJNAnQ8skf4BPsge&open=AZ4-aJNAnQ8skf4BPsge&pullRequest=17717
staticMeta.getPipeName(),
temporaryMeta.getGlobalRemainingEvents(),
temporaryMeta.getGlobalRemainingTime(),
staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
LOGGER.info(
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
import org.apache.iotdb.db.pipe.source.schemaregion.PipePlanTreePrivilegeParseVisitor;
Expand Down Expand Up @@ -138,12 +139,20 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th

if (skipParsing || !forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
if (sourceEvent.isGeneratedByHistoricalExtractor()) {
PipeTerminateEvent.markHistoricalTsFileUnsplit(
sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId);
}
return;
}

try {
sourceEvent.consumeTabletInsertionEventsWithRetry(
this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent");
if (sourceEvent.isGeneratedByHistoricalExtractor()) {
PipeTerminateEvent.markHistoricalTsFileSplit(
sourceEvent.getPipeName(), sourceEvent.getCreationTime(), regionId);
}
} finally {
sourceEvent.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
Expand Down Expand Up @@ -181,6 +182,12 @@ protected boolean executeOnce() throws Exception {
}
},
"PipeProcessorSubtask::executeOnce");
if (tsFileInsertionEvent.isGeneratedByHistoricalExtractor()) {
PipeTerminateEvent.markHistoricalTsFileSplit(
tsFileInsertionEvent.getPipeName(),
tsFileInsertionEvent.getCreationTime(),
regionId);
}
if (ex.get() != null) {
throw ex.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe,
Expand All @@ -45,6 +52,8 @@
*/
public class PipeTerminateEvent extends EnrichedEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTerminateEvent.class);

private final int dataRegionId;

private final boolean shouldMark;
Expand All @@ -58,6 +67,9 @@
// Do not use call run policy to avoid deadlock
private static final ExecutorService terminateExecutor = createTerminateExecutor();

private static final ConcurrentMap<HistoricalTransferKey, HistoricalTransferSummaryCounter>
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP = new ConcurrentHashMap<>();

private static ExecutorService createTerminateExecutor() {
final WrappedThreadPoolExecutor executor =
new WrappedThreadPoolExecutor(
Expand Down Expand Up @@ -145,6 +157,18 @@
}

public void markCompleted() {
final HistoricalTransferSummary summary =
snapshotAndClearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
if (Objects.nonNull(summary)) {
LOGGER.info(
"Pipe {}@{}: terminate event committed for historical transfer. creationTime: {}, shouldMark: {}. {}",

Check warning on line 164 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 112).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJMVnQ8skf4BPsga&open=AZ4-aJMVnQ8skf4BPsga&pullRequest=17717
pipeName,
dataRegionId,
creationTime,
shouldMark,
summary.toReportMessage());

Check warning on line 169 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Invoke method(s) only conditionally.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJMVnQ8skf4BPsgZ&open=AZ4-aJMVnQ8skf4BPsgZ&pullRequest=17717
}

// To avoid deadlock
if (shouldMark) {
terminateExecutor.submit(
Expand All @@ -159,4 +183,159 @@
+ " - "
+ super.toString();
}

public static void initializeHistoricalTransferSummary(
final String pipeName,
final long creationTime,
final int dataRegionId,
final long extractedHistoricalTsFileCount,
final long extractedHistoricalDeletionCount) {
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP
.computeIfAbsent(
new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
ignored -> new HistoricalTransferSummaryCounter())
.initialize(extractedHistoricalTsFileCount, extractedHistoricalDeletionCount);
}

public static void markHistoricalTsFileSkipped(
final String pipeName, final long creationTime, final int dataRegionId) {
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
.skippedHistoricalTsFileCount
.incrementAndGet();
}

public static void markHistoricalTsFileSplit(
final String pipeName, final long creationTime, final int dataRegionId) {
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
.splitHistoricalTsFileCount
.incrementAndGet();
}

public static void markHistoricalTsFileUnsplit(
final String pipeName, final long creationTime, final int dataRegionId) {
getOrCreateHistoricalTransferSummaryCounter(pipeName, creationTime, dataRegionId)
.unsplitHistoricalTsFileCount
.incrementAndGet();
}

public static HistoricalTransferSummary snapshotHistoricalTransferSummary(
final String pipeName, final long creationTime, final int dataRegionId) {
final HistoricalTransferSummaryCounter counter =
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.get(
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
return Objects.nonNull(counter) ? counter.snapshot() : null;
}

public static void clearHistoricalTransferSummary(
final String pipeName, final long creationTime, final int dataRegionId) {
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
}

private static HistoricalTransferSummary snapshotAndClearHistoricalTransferSummary(
final String pipeName, final long creationTime, final int dataRegionId) {
final HistoricalTransferSummaryCounter counter =
HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.remove(
new HistoricalTransferKey(pipeName, creationTime, dataRegionId));
return Objects.nonNull(counter) ? counter.snapshot() : null;
}

private static HistoricalTransferSummaryCounter getOrCreateHistoricalTransferSummaryCounter(
final String pipeName, final long creationTime, final int dataRegionId) {
return HISTORICAL_TRANSFER_SUMMARY_COUNTER_MAP.computeIfAbsent(
new HistoricalTransferKey(pipeName, creationTime, dataRegionId),
ignored -> new HistoricalTransferSummaryCounter());
}

public static final class HistoricalTransferSummary {

private final long extractedHistoricalTsFileCount;
private final long skippedHistoricalTsFileCount;
private final long splitHistoricalTsFileCount;
private final long unsplitHistoricalTsFileCount;
private final long extractedHistoricalDeletionCount;

private HistoricalTransferSummary(
final long extractedHistoricalTsFileCount,
final long skippedHistoricalTsFileCount,
final long splitHistoricalTsFileCount,
final long unsplitHistoricalTsFileCount,
final long extractedHistoricalDeletionCount) {
this.extractedHistoricalTsFileCount = extractedHistoricalTsFileCount;
this.skippedHistoricalTsFileCount = skippedHistoricalTsFileCount;
this.splitHistoricalTsFileCount = splitHistoricalTsFileCount;
this.unsplitHistoricalTsFileCount = unsplitHistoricalTsFileCount;
this.extractedHistoricalDeletionCount = extractedHistoricalDeletionCount;
}

public String toReportMessage() {
return String.format(
"historical summary: extractedTsFileCount=%s, skippedTsFileCount=%s, splitTsFileCount=%s, unsplitTsFileCount=%s, deletionCount=%s",

Check warning on line 273 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 141).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJMVnQ8skf4BPsgb&open=AZ4-aJMVnQ8skf4BPsgb&pullRequest=17717
extractedHistoricalTsFileCount,
skippedHistoricalTsFileCount,
splitHistoricalTsFileCount,
unsplitHistoricalTsFileCount,
extractedHistoricalDeletionCount);
}
}

private static final class HistoricalTransferSummaryCounter {

private final AtomicLong extractedHistoricalTsFileCount = new AtomicLong(0);
private final AtomicLong skippedHistoricalTsFileCount = new AtomicLong(0);
private final AtomicLong splitHistoricalTsFileCount = new AtomicLong(0);
private final AtomicLong unsplitHistoricalTsFileCount = new AtomicLong(0);
private final AtomicLong extractedHistoricalDeletionCount = new AtomicLong(0);

private void initialize(
final long extractedHistoricalTsFileCount, final long extractedHistoricalDeletionCount) {
this.extractedHistoricalTsFileCount.set(extractedHistoricalTsFileCount);
this.skippedHistoricalTsFileCount.set(0);
this.splitHistoricalTsFileCount.set(0);
this.unsplitHistoricalTsFileCount.set(0);
this.extractedHistoricalDeletionCount.set(extractedHistoricalDeletionCount);
}

private HistoricalTransferSummary snapshot() {
return new HistoricalTransferSummary(
extractedHistoricalTsFileCount.get(),
skippedHistoricalTsFileCount.get(),
splitHistoricalTsFileCount.get(),
unsplitHistoricalTsFileCount.get(),
extractedHistoricalDeletionCount.get());
}
}

private static final class HistoricalTransferKey {

private final String pipeName;
private final long creationTime;
private final int dataRegionId;

private HistoricalTransferKey(
final String pipeName, final long creationTime, final int dataRegionId) {
this.pipeName = pipeName;
this.creationTime = creationTime;
this.dataRegionId = dataRegionId;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof HistoricalTransferKey)) {
return false;
}
final HistoricalTransferKey that = (HistoricalTransferKey) obj;
return creationTime == that.creationTime
&& dataRegionId == that.dataRegionId
&& Objects.equals(pipeName, that.pipeName);
}

@Override
public int hashCode() {
return Objects.hash(pipeName, creationTime, dataRegionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@
new HashMap<>();
private final Map<PersistentResource, Long> pendingResource2ReplicateIndexForIoTV2 =
new HashMap<>();
private int extractedHistoricalTsFileCount = 0;
private int extractedHistoricalDeletionCount = 0;

@Override
public void validate(final PipeParameterValidator validator) {
Expand Down Expand Up @@ -488,6 +490,8 @@
return;
}
hasBeenStarted = true;
extractedHistoricalTsFileCount = 0;
extractedHistoricalDeletionCount = 0;

final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
Expand Down Expand Up @@ -521,6 +525,12 @@
? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
: o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex()));
pendingQueue = new ArrayDeque<>(originalResourceList);
PipeTerminateEvent.initializeHistoricalTransferSummary(
pipeName,
creationTime,
dataRegionId,
extractedHistoricalTsFileCount,
extractedHistoricalDeletionCount);

LOGGER.info(
DataNodePipeMessages.PIPE_FINISH_TO_SORT_ALL_EXTRACTED_RESOURCES,
Expand Down Expand Up @@ -649,6 +659,7 @@
return true;
}
});
extractedHistoricalTsFileCount = filteredTsFileResources2TableNames.size();

LOGGER.info(
"Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, "
Expand Down Expand Up @@ -798,6 +809,7 @@
})
.collect(Collectors.toList());
resourceList.addAll(allDeletionResources);
extractedHistoricalDeletionCount = allDeletionResources.size();
LOGGER.info(
DataNodePipeMessages.PIPE_FINISH_TO_EXTRACT_DELETIONS_EXTRACT_DELETIONS,
pipeName,
Expand Down Expand Up @@ -841,6 +853,16 @@
}

private Event supplyTerminateEvent() {
final PipeTerminateEvent.HistoricalTransferSummary historicalTransferSummary =
PipeTerminateEvent.snapshotHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
if (Objects.nonNull(historicalTransferSummary)) {
LOGGER.info(
"Pipe {}@{}: historical source has supplied all events, emitting terminate event. {}",
pipeName,
dataRegionId,
historicalTransferSummary.toReportMessage());

Check warning on line 863 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Invoke method(s) only conditionally.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4-aJMvnQ8skf4BPsgc&open=AZ4-aJMvnQ8skf4BPsgc&pullRequest=17717
}

final PipeTerminateEvent terminateEvent =
new PipeTerminateEvent(
pipeName,
Expand All @@ -867,6 +889,7 @@
}

filteredTsFileResources2TableNames.remove(resource);
PipeTerminateEvent.markHistoricalTsFileSkipped(pipeName, creationTime, dataRegionId);
LOGGER.info(
DataNodePipeMessages.PIPE_SKIP_HISTORICAL_TSFILE_BECAUSE_REALTIME_SOURCE,
pipeName,
Expand Down Expand Up @@ -1074,6 +1097,9 @@

@Override
public synchronized void close() {
if (!isTerminateSignalSent) {
PipeTerminateEvent.clearHistoricalTransferSummary(pipeName, creationTime, dataRegionId);
}
if (Objects.nonNull(pendingQueue)) {
pendingQueue.forEach(
resource -> {
Expand Down
Loading
Loading