diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java index ecf9b907aa6ef..ab6cc39f41338 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java @@ -108,6 +108,18 @@ else if (!persistenceEnable()) { /** Test checkpoint command with persistence enabled. */ @Test public void testCheckpointPersistenceCluster() throws Exception { + LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build(); + LogListener startMetricsLsnr = LogListener.matches(Pattern.compile( + "^.*Checkpoint started .*checkpointBeforeLockTime=\\d+ms.*$")).build(); + LogListener skipMetricsCheckpointLsnr = LogListener.matches(Pattern.compile( + "^.*Skipping checkpoint .*checkpointListenersExecuteTime=\\d+ms.*$")).build(); + LogListener finishMetricsCheckPointLsnr = LogListener.matches(Pattern.compile( + "^.*Checkpoint finished .*total=\\d+ms.*$")).build(); + listeningLog.registerListener(checkpointReasonLsnr); + listeningLog.registerListener(startMetricsLsnr); + listeningLog.registerListener(skipMetricsCheckpointLsnr); + listeningLog.registerListener(finishMetricsCheckPointLsnr); + persistenceEnable(true); IgniteEx srv = startGrids(2); @@ -133,12 +145,10 @@ public void testCheckpointPersistenceCluster() throws Exception { assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--reason", "test_reason")); - LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build(); - - listeningLog.registerListener(checkpointReasonLsnr); - assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000)); assertTrue(GridTestUtils.waitForCondition(checkpointReasonLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000)); assertFalse(testOut.toString().contains("persistence disabled")); outputContains(": Checkpoint started"); @@ -146,11 +156,15 @@ public void testCheckpointPersistenceCluster() throws Exception { testOut.reset(); checkpointFinishedLsnr.reset(); + startMetricsLsnr.reset(); + finishMetricsCheckPointLsnr.reset(); cacheCli.put(3, 3); assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--wait-for-finish")); assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000)); assertFalse(testOut.toString().contains("persistence disabled")); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java index aa9d6abb77557..49ef15acd26ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java @@ -18,16 +18,16 @@ package org.apache.ignite.internal; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_DISABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT; @@ -56,9 +56,12 @@ public class LongJVMPauseDetector { public static final int DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT = 20; /** Precision. */ - private static final int PRECISION = + private static final long PRECISION = getInteger(IGNITE_JVM_PAUSE_DETECTOR_PRECISION, DFLT_JVM_PAUSE_DETECTOR_PRECISION); + /** Precision. */ + private static final long PRECISION_NANOS = PRECISION * 1_000_000L; + /** Threshold. */ private static final int THRESHOLD = getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); @@ -83,15 +86,19 @@ public class LongJVMPauseDetector { private long longPausesCnt; /** Long pause total duration. */ - private long longPausesTotalDuration; + private long longPausesTotalDurationNanos; /** Last detector's wake up time. */ - private long lastWakeUpTime; + private long lastWakeUpTimeNanos = getMonotonicTimeNanos(); /** Long pauses timestamps. */ @GridToStringInclude private final long[] longPausesTimestamps = new long[EVT_CNT]; + /** Long pauses monotonic timestamps. */ + @GridToStringExclude + private final long[] longPausesMonotonicTimestamps = new long[EVT_CNT]; + /** Long pauses durations. */ @GridToStringInclude private final long[] longPausesDurations = new long[EVT_CNT]; @@ -117,40 +124,31 @@ public void start() { } final Thread worker = new IgniteThread(igniteInstanceName, "jvm-pause-detector-worker", () -> { - synchronized (this) { - lastWakeUpTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - if (log.isDebugEnabled()) log.debug(Thread.currentThread().getName() + " has been started."); - while (true) { try { - Thread.sleep(PRECISION); - - final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - final long pause = now - PRECISION - lastWakeUpTime; - - if (pause >= THRESHOLD) { - log.warning("Possible too long JVM pause: " + pause + " milliseconds."); - - synchronized (this) { - final int next = (int)(longPausesCnt % EVT_CNT); - - longPausesCnt++; - - longPausesTotalDuration += pause; - - longPausesTimestamps[next] = now; - - longPausesDurations[next] = pause; - - lastWakeUpTime = now; + synchronized (this) { + lastWakeUpTimeNanos = getMonotonicTimeNanos(); + long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; + while (getMonotonicTimeNanos() < awaitDeadline) { + long monotonicTimeNanos = getMonotonicTimeNanos(); + long waitNanos = awaitDeadline - monotonicTimeNanos; + long waitMillis = Math.max(0, (long)Math.ceil(waitNanos / 1_000_000d)); + awaitDeadline = monotonicTimeNanos + (waitMillis * 1_000_000); + wait(waitMillis); } - } - else { - synchronized (this) { - lastWakeUpTime = now; + long nanoTime = getMonotonicTimeNanos(); + long pause = nanoTime - awaitDeadline; + long pauseMillis = pause / 1_000_000; + if (pauseMillis >= THRESHOLD) { + log.warning("Possible too long JVM pause: " + + "between " + pauseMillis + " and " + (pauseMillis + PRECISION) + " ms. "); + final int next = (int) (longPausesCnt++ % EVT_CNT); + longPausesTotalDurationNanos += pause; + longPausesTimestamps[next] = System.currentTimeMillis(); + longPausesMonotonicTimestamps[next] = nanoTime; + longPausesDurations[next] = pauseMillis; } } } @@ -209,14 +207,7 @@ synchronized long longPausesCount() { * @return Long JVM pauses total duration. */ synchronized long longPausesTotalDuration() { - return longPausesTotalDuration; - } - - /** - * @return Last checker's wake up time. - */ - public synchronized long getLastWakeUpTime() { - return lastWakeUpTime; + return TimeUnit.NANOSECONDS.toMillis(longPausesTotalDurationNanos); } /** @@ -232,20 +223,42 @@ synchronized Map longPauseEvents() { } /** - * @return Pair ({@code last long pause event time}, {@code pause time duration}) or {@code null}, if long pause - * wasn't occurred. + * @param cpStart Check point start time in nanos. + * @return Tries to explain total pauses spotted during check point process + * or {@link Optional#empty()} if none were found */ - public synchronized @Nullable IgniteBiTuple getLastLongPause() { - int lastPauseIdx = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT); - - if (longPausesTimestamps[lastPauseIdx] == 0) - return null; - - return new IgniteBiTuple<>(longPausesTimestamps[lastPauseIdx], longPausesDurations[lastPauseIdx]); + public Optional getTotalSpottedPausesExplain(long cpStart) { + int lastPointer = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT); + int pausesSpottedTimes = 0; + int curPointer = lastPointer; + StringBuilder explainBuilder = new StringBuilder(); + synchronized (this) { + do { + if (longPausesMonotonicTimestamps[curPointer] <= cpStart) + break; + explainBuilder.append(longPausesDurations[curPointer]).append(" ms at ") + .append(longPausesTimestamps[curPointer]).append(";"); + pausesSpottedTimes++; + curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1; + } while (curPointer != lastPointer); + } + return pausesSpottedTimes == 0 ? + Optional.empty() : + Optional.of(String.format("Pause detecor spotted %d pauses: [%s]. Each with precision %d ms", + pausesSpottedTimes, + explainBuilder, + PRECISION)); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(LongJVMPauseDetector.class, this); } + + /** + * @return monotonic time in nanos + */ + protected long getMonotonicTimeNanos() { + return System.nanoTime(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java new file mode 100644 index 0000000000000..2b75c3c6925d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +import java.util.EnumMap; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.LongJVMPauseDetector; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteCommonsSystemProperties.getInteger; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD; +import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD; + +/** + * Tracks various checkpoint phases and stats. + * + * Assumed sequence of events: + *
    + *
  1. Checkpoint start
  2. + *
  3. CP Lock wait start
  4. + *
  5. CP mark start
  6. + *
  7. CP Lock release
  8. + *
  9. Pages write start
  10. + *
  11. fsync start
  12. + *
  13. Checkpoint end
  14. + *
+ */ +public class CheckpointMetricsTracker { + /** Checkpoint started log message format. */ + private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" + + "checkpointId=%s, " + + "startPtr=%s, " + + "checkpointBeforeLockTime=%dms, " + + "checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, " + + "checkpointLockHoldTime=%dms, " + + "walCpRecordFsyncDuration=%dms, " + + "splitAndSortCpPagesDuration=%dms, " + + "writeRecoveryDataDuration=%dms, " + + "writeCheckpointEntryDuration=%dms, " + + "%s" + + "pages=%d, " + + "reason='%s']"; + + /** Executor service to provide calculations and metrics export */ + public static final ExecutorService CHECK_POINT_METRICS_EXPORT_EXECUTOR = + Executors.newSingleThreadExecutor((runnable) -> { + Thread thread = new Thread(runnable); + try { + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY - 1); + thread.setName("check-point-metrics-export-thread"); + } + catch (SecurityException ignored) { + // do nothing + } + return thread; + }); + + static { + Runtime runtime = Runtime.getRuntime(); + Thread shutdownExecutor = new Thread(() -> { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.shutdown(); + try { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.awaitTermination(60, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + runtime.addShutdownHook(shutdownExecutor); + } + + /** */ + private static final AtomicIntegerFieldUpdater DATA_PAGES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "dataPages"); + + /** */ + private static final AtomicIntegerFieldUpdater COW_PAGES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages"); + + /** Long JVM pause threshold. */ + private final int longJvmPauseThreshold = + getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); + + /** Ignite logger. */ + private final IgniteLogger log; + + /** Previously calculated duration storage. */ + private final EnumMap durationsStorage = new EnumMap<>(Duration.class); + + /** Pause detector. */ + private final LongJVMPauseDetector pauseDetector; + + /** */ + private volatile int dataPages; + + /** */ + private volatile int cowPages; + + /** */ + private final long cpStart = System.nanoTime(); + + /** Epoch time of checkpoint process start */ + private final long cpStartEpochTime = System.currentTimeMillis(); + + /** */ + private long cpLockWaitStart; + + /** */ + private long cpMarkStart; + + /** */ + private long cpMarkEnd; + + /** */ + private long cpLockRelease; + + /** */ + private long cpPagesWriteStart; + + /** */ + private long cpFsyncStart; + + /** */ + private long cpEnd; + + /** */ + private long walCpRecordFsyncStart; + + /** */ + private long walCpRecordFsyncEnd; + + /** */ + private long cpMarkerStoreEnd; + + /** */ + private long splitAndSortCpPagesEnd; + + /** */ + private long cpRecoveryDataWriteEnd; + + /** */ + private long cpRecoveryDataSize; + + /** */ + private long listenersExecEnd; + + /** + * @param log - current ignite logger instance + */ + public CheckpointMetricsTracker(IgniteLogger log, LongJVMPauseDetector pauseDetector) { + this.log = log; + this.pauseDetector = pauseDetector; + } + + /** + * Increments counter if copy on write page was written. + */ + public void onCowPageWritten() { + COW_PAGES_UPDATER.incrementAndGet(this); + } + + /** */ + public void onDataPageWritten() { + DATA_PAGES_UPDATER.incrementAndGet(this); + } + + /** + * @return COW pages. + */ + public int cowPagesWritten() { + return cowPages; + } + + /** + * @return Data pages written. + */ + public int dataPagesWritten() { + return dataPages; + } + + /** */ + public void onLockWaitStart() { + cpLockWaitStart = System.nanoTime(); + } + + /** */ + public void onMarkStart() { + cpMarkStart = System.nanoTime(); + } + + /** */ + public void onMarkEnd() { + cpMarkEnd = System.nanoTime(); + } + + /** */ + public void onLockRelease() { + cpLockRelease = System.nanoTime(); + } + + /** */ + public void onPagesWriteStart() { + cpPagesWriteStart = System.nanoTime(); + } + + /** */ + public void onFsyncStart() { + cpFsyncStart = System.nanoTime(); + } + + /** */ + public void onEnd() { + cpEnd = System.nanoTime(); + } + + /** */ + public void onListenersExecuteEnd() { + listenersExecEnd = System.nanoTime(); + } + + /** */ + public void onWalCpRecordFsyncStart() { + walCpRecordFsyncStart = System.nanoTime(); + } + + /** */ + public void onCpMarkerStoreEnd() { + cpMarkerStoreEnd = System.nanoTime(); + } + + /** */ + public void onSplitAndSortCpPagesEnd() { + splitAndSortCpPagesEnd = System.nanoTime(); + } + + /** */ + public void onWalCpRecordFsyncEnd() { + walCpRecordFsyncEnd = System.nanoTime(); + } + + /** */ + public void onWriteRecoveryDataEnd(long recoveryDataSize) { + cpRecoveryDataSize = recoveryDataSize; + cpRecoveryDataWriteEnd = System.nanoTime(); + } + + /** + * Tries to log message with checkpoint recovery data write start details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckPointRecoveryDataWriteStart(Checkpoint chp) { + int pagesSize = chp.pagesSize; + Object checkpointId = chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(); + Object checkpointMark = chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(); + String recoveryDataWriteStartReason = chp.progress.reason(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info(String.format("Checkpoint recovery data write started [" + + "checkpointId=%s, " + + "startPtr=%s, " + + "pages=%d, " + + "checkpointBeforeLockTime=%dms, " + + "checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, " + + "checkpointLockHoldTime=%dms, " + + "walCpRecordFsyncDuration=%dms, " + + "splitAndSortCpPagesDuration=%dms, " + + "reason='%s']", + checkpointId, + checkpointMark, + pagesSize, + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + recoveryDataWriteStartReason + )); + } + }); + } + + /** + * Tries to log message with checkpoint start details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckPointStart(Checkpoint chp) { + int pagesSize = chp.pagesSize; + String checkPointStartReason = chp.progress.reason(); + Object checkpointId = chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(); + Object checkpointMark = chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info( + String.format( + CHECKPOINT_STARTED_LOG_FORMAT, + checkpointId, + checkpointMark, + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + possibleLongJvmPauseExplaination(), + pagesSize, + checkPointStartReason)); + } + }); + } + + /** + * Tries to log message with checkpoint finish details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckpointFinish(Checkpoint chp) { + int pagesSize = chp.pagesSize; + Object checkpointId = chp.cpEntry != null ? chp.cpEntry.checkpointId() : ""; + Object checkpointMark = chp.cpEntry != null ? chp.cpEntry.checkpointMark() : ""; + IgniteBiTuple walSegsCoveredRange = chp.walSegsCoveredRange; + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + + "walSegmentsCovered=%s, markDuration=%dms, recoveryWrite=%dms, pagesWrite=%dms, " + + "fsync=%dms, total=%dms]", + checkpointId, + pagesSize, + checkpointMark, + walRangeStr(walSegsCoveredRange), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.TOTAL))); + } + }); + } + + /** + * Tries to log message with checkpoint skip details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckpointSkip(Checkpoint chp) { + String skipReason = chp.progress.reason(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + LT.info(log, String.format( + "Skipping checkpoint (no pages were modified) [" + + "checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']", + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + skipReason) + ); + } + }); + } + + /** + * Updates performance statistics if feature is enabled + * @param chp Checkpoint (This metrics owner) + * @param psproc Performance statistics processor. + */ + void updatePerformanceStatistics(Checkpoint chp, PerformanceStatisticsProcessor psproc) { + int pagesSize = chp.pagesSize; + int dataPagesWritten = dataPagesWritten(); + int cowPagesWritten = cowPagesWritten(); + invokeLater(() -> { + if (psproc.enabled()) { + psproc.checkpoint( + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.TOTAL), + cpStartEpochTime, + pagesSize, + dataPagesWritten, + cowPagesWritten); + } + }); + } + + /** + * Persists metrics of Checkpoint if feature is enabled + * @param chp Checkpoint (This metrics owner) + * @param persStoreMetrics Metrics persistence storage. + * @param cacheProc Cache processor. + */ + void storeMetrics(Checkpoint chp, DataStorageMetricsImpl persStoreMetrics, GridCacheProcessor cacheProc) { + boolean metricsEnabled = persStoreMetrics.metricsEnabled(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cacheProc.context().database(); + int pagesSize = chp.pagesSize; + long pageStoresSize = dbMgr.forAllPageStores(PageStore::size); + long pageStoresSparseSize = dbMgr.forAllPageStores(PageStore::getSparseSize); + int cowPagesWritten = cowPagesWritten(); + int dataPagesWritten = dataPagesWritten(); + invokeLater(() -> { + if (metricsEnabled) { + persStoreMetrics.onCheckpoint( + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.TOTAL), + cpStartEpochTime, + pagesSize, + dataPagesWritten, + cowPagesWritten, + cpRecoveryDataSize, + pageStoresSize, + pageStoresSparseSize + ); + } + }); + } + + /** + * Creates a string of a range WAL segments. + * + * @param walRange Range of WAL segments. + * @return The message about how many WAL segments was between previous checkpoint and current one. + */ + private String walRangeStr(@Nullable IgniteBiTuple walRange) { + if (walRange == null) + return ""; + + String res; + + long startIdx = walRange.get1(); + long endIdx = walRange.get2(); + + if (endIdx < 0 || endIdx < startIdx) + res = "[]"; + else if (endIdx == startIdx) + res = "[" + endIdx + "]"; + else + res = "[" + startIdx + " - " + endIdx + "]"; + + return res; + } + + /** + * @return Explain possible JVM pause. + */ + private String possibleLongJvmPauseExplaination() { + long lockDuration = getDurationMillis(Duration.TOTAL_LOCK); + if (LongJVMPauseDetector.enabled() && lockDuration > longJvmPauseThreshold) { + StringBuilder explainBuilder = new StringBuilder("Long JVM pause is spotted! Checkpoint lock took ") + .append(lockDuration).append("ms, "); + Optional totalSpottedPausesExplain = pauseDetector.getTotalSpottedPausesExplain(cpStart); + totalSpottedPausesExplain.ifPresent(explainBuilder::append); + totalSpottedPausesExplain.ifPresent(ignored -> explainBuilder.append(", ")); + return explainBuilder.toString(); + } + return ""; + } + + /** + * Tries to execute task, will not throw {@link java.util.concurrent.RejectedExecutionException} + * @param runnable Runnable. + */ + private void invokeLater(Runnable runnable) { + try { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.submit(runnable); + } + catch (RejectedExecutionException ignored) { + // do nothing, looks like it is the end + } + } + + /** + * Tries to get previously calculated result + * or calculates duration + * @param durationEntry Duration entry. + */ + private long getDurationMillis(Duration durationEntry) { + return durationsStorage.computeIfAbsent(durationEntry, duration -> duration.getMillis(this)); + } + + /** Enum contains all tracked durations, it's calculation and comment */ + private enum Duration { + /** Total checkpoint duration. */ + TOTAL(tracker -> tracker.cpEnd - tracker.cpStart), + /** Since start due to write lock acquisition */ + BEFORE_LOCK(tracker -> tracker.cpLockWaitStart - tracker.cpStart), + /** Checkpoint lock wait duration. */ + LOCK_WAIT(tracker -> tracker.cpMarkStart - tracker.cpLockWaitStart), + /** Checkpoint lock hold duration */ + LOCK_HOLD(tracker -> tracker.cpLockRelease - tracker.cpMarkStart), + /** Lock wait and hold total duration */ + TOTAL_LOCK(tracker -> tracker.cpLockRelease - tracker.cpLockWaitStart), + /** Fire listeners under write lock duration */ + LISTENERS_EXECUTE(tracker -> tracker.listenersExecEnd - tracker.cpMarkStart), + /** Checkpoint mark duration */ + MARK(tracker -> tracker.cpMarkEnd - tracker.cpMarkStart), + /** Pages write duration */ + PAGES_WRITE(tracker -> tracker.cpFsyncStart - tracker.cpPagesWriteStart), + /** Checkpoint fsync duration */ + FSYNC(tracker -> tracker.cpEnd - tracker.cpFsyncStart), + /** Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. */ + WAL_CP_RECORD_FSYNC(tracker -> tracker.walCpRecordFsyncEnd - tracker.walCpRecordFsyncStart), + /** Duration of splitting and sorting checkpoint pages */ + SPLIT_AND_SORT_CP_PAGES(tracker -> tracker.splitAndSortCpPagesEnd - tracker.walCpRecordFsyncEnd), + /** Duration of writing recovery data */ + RECOVERY_DATA_WRITE(tracker -> tracker.cpRecoveryDataWriteEnd - tracker.cpMarkEnd), + /** + * Duration of checkpoint entry buffer writing to file. + * + * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) + */ + WRITE_CHECKPOINT_ENTRY(tracker -> tracker.cpMarkerStoreEnd - tracker.cpRecoveryDataWriteEnd); + + /** Duration nano time calculation function */ + private final Function durationNanosFunction; + + /** + * @param durationNanosFunction Duration nano time calculation function. + */ + Duration(Function durationNanosFunction) { + this.durationNanosFunction = durationNanosFunction; + } + + /** + * @param tracker Tracker. + */ + private Long getMillis(CheckpointMetricsTracker tracker) { + Long nanos = durationNanosFunction.apply(tracker); + return TimeUnit.NANOSECONDS.toMillis(nanos); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java index 3a508ca679558..f81566234aa21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java index a261823a69c82..69f663673a60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java index 47480dace142a..6f7ed7bb61cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.StorageException; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index bd5c3c9f29811..406e438a49a74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -32,6 +32,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -48,10 +49,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.failure.FailureProcessor; @@ -61,22 +60,17 @@ import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; import org.apache.ignite.internal.worker.WorkersRegistry; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.getBoolean; -import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; -import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD; import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP; @@ -100,22 +94,6 @@ */ @SuppressWarnings("NakedNotify") public class Checkpointer extends GridWorker { - /** Checkpoint started log message format. */ - private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" + - "checkpointId=%s, " + - "startPtr=%s, " + - "checkpointBeforeLockTime=%dms, " + - "checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, " + - "checkpointLockHoldTime=%dms, " + - "walCpRecordFsyncDuration=%dms, " + - "splitAndSortCpPagesDuration=%dms, " + - "writeRecoveryDataDuration=%dms, " + - "writeCheckpointEntryDuration=%dms, " + - "%s" + - "pages=%d, " + - "reason='%s']"; - /** Skip sync. */ private final boolean skipSync = getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC); @@ -125,10 +103,6 @@ public class Checkpointer extends GridWorker { /** Avoid the start checkpoint if checkpointer was canceled. */ private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); - /** Long JVM pause threshold. */ - private final int longJvmPauseThreshold = - getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); - /** Pause detector. */ private final LongJVMPauseDetector pauseDetector; @@ -395,7 +369,7 @@ private void doCheckpoint() { Checkpoint chp = null; try { - CheckpointMetricsTracker tracker = new CheckpointMetricsTracker(); + CheckpointMetricsTracker tracker = new CheckpointMetricsTracker(log, pauseDetector); startCheckpointProgress(); @@ -414,31 +388,7 @@ private void doCheckpoint() { checkpointRecoveryFileStorage.clear(); if (writeRecoveryData) { - if (log.isInfoEnabled()) { - log.info(String.format("Checkpoint recovery data write started [" + - "checkpointId=%s, " + - "startPtr=%s, " + - "pages=%d, " + - "checkpointBeforeLockTime=%dms, " + - "checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, " + - "checkpointLockHoldTime=%dms, " + - "walCpRecordFsyncDuration=%dms, " + - "splitAndSortCpPagesDuration=%dms, " + - "reason='%s']", - chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), - chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), - chp.pagesSize, - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.splitAndSortCpPagesDuration(), - chp.progress.reason() - )); - } - + tracker.logCheckPointRecoveryDataWriteStart(chp); recoveryDataSize = writeRecoveryData(chp); } } @@ -464,45 +414,12 @@ private void doCheckpoint() { updateHeartbeat(); if (chp.hasDelta()) { - if (log.isInfoEnabled()) { - long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker); - - log.info( - String.format( - CHECKPOINT_STARTED_LOG_FORMAT, - chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), - chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.writeCheckpointEntryDuration(), - possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "", - chp.pagesSize, - chp.progress.reason() - ) - ); - } - + tracker.logCheckPointStart(chp); if (!writePages(tracker, chp.cpPages, chp.progress, this, this::isShutdownNow)) return; } else { - if (log.isInfoEnabled()) - LT.info(log, String.format( - "Skipping checkpoint (no pages were modified) [" + - "checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']", - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - chp.progress.reason()) - ); - + tracker.logCheckpointSkip(chp); tracker.onPagesWriteStart(); tracker.onFsyncStart(); } @@ -514,24 +431,11 @@ private void doCheckpoint() { tracker.onEnd(); - if (chp.hasDelta() || destroyedPartitionsCnt > 0) { - if (log.isInfoEnabled()) { - log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + - "walSegmentsCovered=%s, markDuration=%dms, recoveryWrite=%dms, pagesWrite=%dms, " + - "fsync=%dms, total=%dms]", - chp.cpEntry != null ? chp.cpEntry.checkpointId() : "", - chp.pagesSize, - chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "", - walRangeStr(chp.walSegsCoveredRange), - tracker.markDuration(), - tracker.recoveryDataWriteDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration())); - } - } + if (chp.hasDelta() || destroyedPartitionsCnt > 0) + tracker.logCheckpointFinish(chp); - updateMetrics(chp, tracker); + tracker.updatePerformanceStatistics(chp, psproc); + tracker.storeMetrics(chp, persStoreMetrics, cacheProcessor); } catch (IgniteCheckedException e) { chp.progress.fail(e); @@ -686,83 +590,6 @@ boolean writePages( return true; } - /** - * @param chp Checkpoint. - * @param tracker Tracker. - */ - private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) { - if (psproc.enabled()) { - psproc.checkpoint( - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.markDuration(), - tracker.lockHoldDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.writeCheckpointEntryDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.totalDuration(), - tracker.checkpointStartTime(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten()); - } - - if (persStoreMetrics.metricsEnabled()) { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cacheProcessor.context().database(); - - persStoreMetrics.onCheckpoint( - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.markDuration(), - tracker.lockHoldDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.writeCheckpointEntryDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.totalDuration(), - tracker.checkpointStartTime(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten(), - tracker.recoveryDataSize(), - dbMgr.forAllPageStores(PageStore::size), - dbMgr.forAllPageStores(PageStore::getSparseSize) - ); - } - } - - /** - * Creates a string of a range WAL segments. - * - * @param walRange Range of WAL segments. - * @return The message about how many WAL segments was between previous checkpoint and current one. - */ - private String walRangeStr(@Nullable IgniteBiTuple walRange) { - if (walRange == null) - return ""; - - String res; - - long startIdx = walRange.get1(); - long endIdx = walRange.get2(); - - if (endIdx < 0 || endIdx < startIdx) - res = "[]"; - else if (endIdx == startIdx) - res = "[" + endIdx + "]"; - else - res = "[" + startIdx + " - " + endIdx + "]"; - - return res; - } - /** * Processes all evicted partitions scheduled for destroy. * @@ -926,31 +753,6 @@ private void waitCheckpointEvent() { } } - /** - * @param tracker Checkpoint metrics tracker. - * @return Duration of possible JVM pause, if it was detected, or {@code -1} otherwise. - */ - private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) { - if (LongJVMPauseDetector.enabled()) { - if (tracker.lockWaitDuration() + tracker.lockHoldDuration() > longJvmPauseThreshold) { - long now = System.currentTimeMillis(); - - // We must get last wake up time before search possible pause in events map. - long wakeUpTime = pauseDetector.getLastWakeUpTime(); - - IgniteBiTuple lastLongPause = pauseDetector.getLastLongPause(); - - if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1()) - return lastLongPause.get2(); - - if (now - wakeUpTime > longJvmPauseThreshold) - return now - wakeUpTime; - } - } - - return -1L; - } - /** * Update the current checkpoint info from the scheduled one. */ @@ -995,6 +797,7 @@ private void startCheckpointProgress() { * * @deprecated Should be rewritten to public API. */ + @Deprecated public IgniteInternalFuture enableCheckpoints(boolean enable) { GridFutureAdapter fut = new GridFutureAdapter<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java deleted file mode 100644 index f8ba2d1adcbcf..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.pagemem; - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; -import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType; -import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage; -import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; - -/** - * Tracks various checkpoint phases and stats. - * - * Assumed sequence of events: - *
    - *
  1. Checkpoint start
  2. - *
  3. CP Lock wait start
  4. - *
  5. CP mark start
  6. - *
  7. CP Lock release
  8. - *
  9. Pages write start
  10. - *
  11. fsync start
  12. - *
  13. Checkpoint end
  14. - *
- */ -public class CheckpointMetricsTracker { - /** */ - private static final AtomicIntegerFieldUpdater DATA_PAGES_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "dataPages"); - - /** */ - private static final AtomicIntegerFieldUpdater COW_PAGES_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages"); - - /** */ - private volatile int dataPages; - - /** */ - private volatile int cowPages; - - /** */ - private final long cpStart = System.currentTimeMillis(); - - /** */ - private long cpLockWaitStart; - - /** */ - private long cpMarkStart; - - /** */ - private long cpMarkEnd; - - /** */ - private long cpLockRelease; - - /** */ - private long cpPagesWriteStart; - - /** */ - private long cpFsyncStart; - - /** */ - private long cpEnd; - - /** */ - private long walCpRecordFsyncStart; - - /** */ - private long walCpRecordFsyncEnd; - - /** */ - private long cpMarkerStoreEnd; - - /** */ - private long splitAndSortCpPagesEnd; - - /** */ - private long cpRecoveryDataWriteEnd; - - /** */ - private long cpRecoveryDataSize; - - /** */ - private long listenersExecEnd; - - /** - * Increments counter if copy on write page was written. - */ - public void onCowPageWritten() { - COW_PAGES_UPDATER.incrementAndGet(this); - } - - /** */ - public void onDataPageWritten() { - DATA_PAGES_UPDATER.incrementAndGet(this); - } - - /** - * @return COW pages. - */ - public int cowPagesWritten() { - return cowPages; - } - - /** - * @return Data pages written. - */ - public int dataPagesWritten() { - return dataPages; - } - - /** */ - public void onLockWaitStart() { - cpLockWaitStart = System.currentTimeMillis(); - } - - /** */ - public void onMarkStart() { - cpMarkStart = System.currentTimeMillis(); - } - - /** */ - public void onMarkEnd() { - cpMarkEnd = System.currentTimeMillis(); - } - - /** */ - public void onLockRelease() { - cpLockRelease = System.currentTimeMillis(); - } - - /** */ - public void onPagesWriteStart() { - cpPagesWriteStart = System.currentTimeMillis(); - } - - /** */ - public void onFsyncStart() { - cpFsyncStart = System.currentTimeMillis(); - } - - /** */ - public void onEnd() { - cpEnd = System.currentTimeMillis(); - } - - /** */ - public void onListenersExecuteEnd() { - listenersExecEnd = System.currentTimeMillis(); - } - - /** */ - public void onWalCpRecordFsyncStart() { - walCpRecordFsyncStart = System.currentTimeMillis(); - } - - /** */ - public void onCpMarkerStoreEnd() { - cpMarkerStoreEnd = System.currentTimeMillis(); - } - - /** */ - public void onSplitAndSortCpPagesEnd() { - splitAndSortCpPagesEnd = System.currentTimeMillis(); - } - - /** */ - public void onWalCpRecordFsyncEnd() { - walCpRecordFsyncEnd = System.currentTimeMillis(); - } - - /** */ - public void onWriteRecoveryDataEnd(long recoveryDataSize) { - cpRecoveryDataSize = recoveryDataSize; - cpRecoveryDataWriteEnd = System.currentTimeMillis(); - } - - /** - * @return Total checkpoint duration. - */ - public long totalDuration() { - return cpEnd - cpStart; - } - - /** - * @return Checkpoint lock wait duration. - */ - public long lockWaitDuration() { - return cpMarkStart - cpLockWaitStart; - } - - /** - * @return Checkpoint action before taken write lock duration. - */ - public long beforeLockDuration() { - return cpLockWaitStart - cpStart; - } - - /** - * @return Execution listeners under write lock duration. - */ - public long listenersExecuteDuration() { - return listenersExecEnd - cpMarkStart; - } - - /** - * @return Checkpoint mark duration. - */ - public long markDuration() { - return cpMarkEnd - cpMarkStart; - } - - /** - * @return Checkpoint lock hold duration. - */ - public long lockHoldDuration() { - return cpLockRelease - cpMarkStart; - } - - /** - * @return Pages write duration. - */ - public long pagesWriteDuration() { - return cpFsyncStart - cpPagesWriteStart; - } - - /** - * @return Checkpoint fsync duration. - */ - public long fsyncDuration() { - return cpEnd - cpFsyncStart; - } - - /** - * @return Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. - */ - public long walCpRecordFsyncDuration() { - return walCpRecordFsyncEnd - walCpRecordFsyncStart; - } - - /** - * @return Duration of splitting and sorting checkpoint pages. - */ - public long splitAndSortCpPagesDuration() { - return splitAndSortCpPagesEnd - walCpRecordFsyncEnd; - } - - /** - * @return Duration of writing recovery data. - */ - public long recoveryDataWriteDuration() { - return cpRecoveryDataWriteEnd - cpMarkEnd; - } - - /** - * @return Size of writing recovery data. - */ - public long recoveryDataSize() { - return cpRecoveryDataSize; - } - - /** - * @return Duration of checkpoint entry buffer writing to file. - * - * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) - */ - public long writeCheckpointEntryDuration() { - return cpMarkerStoreEnd - cpRecoveryDataWriteEnd; - } - - /** - * @return Checkpoint start time. - */ - public long checkpointStartTime() { - return cpStart; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 4fb148cd4d2dc..5ea09f3ede906 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; import org.apache.ignite.internal.util.function.ThrowableSupplier; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 9913aabd5cfa0..bc2c5b7c29221 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java index 45b436c8dabff..043f10fbb6a95 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java @@ -17,11 +17,16 @@ package org.apache.ignite.internal; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; import org.junit.Test; /** @@ -85,4 +90,62 @@ public void testStopWorkerThread() throws Exception { assertFalse(interruptLsnr.check()); assertTrue(stopLsnr.check()); } + + /** + * This test will create, check logic, + * check report gethering and gracefull shutdown + */ + @Test + public void testFullCycle() throws InterruptedException { + long[] monotonicTimes = new long[] { + 0, // init field + 100, // first renew of last wake-up time + 100, // check we didn't wake up spuriously + 100, // we need to get real wait time, so now time is + 50_000_100, // so we waited 50 ms, and previous time is 100 ns, so it's that time now + 550_000_100}; // something happend! Waited for 500 ms (equals to 500ms + previous nanotime)! + CountDownLatch cntDownLatch = new CountDownLatch(7); + LongJVMPauseDetector longJVMPauseDetector = getLongJVMPauseDetector(monotonicTimes, cntDownLatch); + longJVMPauseDetector.start(); + assertTrue(cntDownLatch.await(10, TimeUnit.SECONDS)); + assertEquals(500, longJVMPauseDetector.longPausesTotalDuration()); + assertEquals(1, longJVMPauseDetector.longPausesCount()); + Map longPauseEvts = longJVMPauseDetector.longPauseEvents(); + assertTrue(longPauseEvts.containsValue(500L)); + Optional spottedPausesExplain = longJVMPauseDetector.getTotalSpottedPausesExplain(100); + assertTrue(spottedPausesExplain.isPresent()); + assertTrue(spottedPausesExplain.get().matches("Pause detecor spotted 1 pauses: \\[500 ms at \\d+;]. Each with precision 50 ms")); + longJVMPauseDetector.stop(); + } + + /** + * @param monotonicTimes Monotonic times. Answers for method + * {@link org.apache.ignite.internal.LongJVMPauseDetector#getMonotonicTimeNanos()} + * @param cntDownLatch Count down latch. + */ + private @NotNull LongJVMPauseDetector getLongJVMPauseDetector(long[] monotonicTimes, CountDownLatch cntDownLatch) { + return new LongJVMPauseDetector("test-instance", listeningTestLogger) { + private int visitCounter; + /** {@inheritDoc} */ + @Override protected long getMonotonicTimeNanos() { + synchronized (this) { + visitCounter++; + if (visitCounter - 1 >= monotonicTimes.length) { + cntDownLatch.countDown(); + while (!Thread.currentThread().isInterrupted()) { + try { + wait(Long.MAX_VALUE); + } + catch (InterruptedException ignored) { + // do nothing + } + } + return 0; + } + cntDownLatch.countDown(); + return monotonicTimes[visitCounter - 1]; + } + } + }; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index ea72d1b9c1864..d714c33c2d539 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;