From 8c024f9975a07c6efb8d1398b3ccb8fd1bf81e3a Mon Sep 17 00:00:00 2001 From: Egor Baranov Date: Fri, 29 May 2026 14:47:15 +0300 Subject: [PATCH 1/4] IGNITE-27232 Move time elapsed calculation to monotonic time and add pause explains. --- .../ignite/internal/LongJVMPauseDetector.java | 134 +++++++++------- .../persistence/checkpoint/Checkpointer.java | 80 +++++++--- .../pagemem/CheckpointMetricsTracker.java | 147 +++++++++++++----- 3 files changed, 247 insertions(+), 114 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java index aa9d6abb77557..814e3fdca4a56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java @@ -17,17 +17,20 @@ package org.apache.ignite.internal; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_DISABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT; @@ -56,9 +59,12 @@ public class LongJVMPauseDetector { public static final int DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT = 20; /** Precision. */ - private static final int PRECISION = + private static final long PRECISION = getInteger(IGNITE_JVM_PAUSE_DETECTOR_PRECISION, DFLT_JVM_PAUSE_DETECTOR_PRECISION); + /** Precision. */ + private static final long PRECISION_NANOS = PRECISION * 1_000_000L; + /** Threshold. */ private static final int THRESHOLD = getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); @@ -83,15 +89,18 @@ public class LongJVMPauseDetector { private long longPausesCnt; /** Long pause total duration. */ - private long longPausesTotalDuration; + private long longPausesTotalDurationNanos; /** Last detector's wake up time. */ - private long lastWakeUpTime; + private long lastWakeUpTimeNanos = System.nanoTime(); /** Long pauses timestamps. */ @GridToStringInclude private final long[] longPausesTimestamps = new long[EVT_CNT]; + @GridToStringExclude + private final long[] longPausesMonotonicTimestamps = new long[EVT_CNT]; + /** Long pauses durations. */ @GridToStringInclude private final long[] longPausesDurations = new long[EVT_CNT]; @@ -117,53 +126,41 @@ public void start() { } final Thread worker = new IgniteThread(igniteInstanceName, "jvm-pause-detector-worker", () -> { - synchronized (this) { - lastWakeUpTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - if (log.isDebugEnabled()) log.debug(Thread.currentThread().getName() + " has been started."); - while (true) { - try { - Thread.sleep(PRECISION); - - final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - final long pause = now - PRECISION - lastWakeUpTime; - - if (pause >= THRESHOLD) { - log.warning("Possible too long JVM pause: " + pause + " milliseconds."); - + try { + // don't worry, wait will release monitor and all props will be accessible synchronized (this) { - final int next = (int)(longPausesCnt % EVT_CNT); - - longPausesCnt++; - - longPausesTotalDuration += pause; - - longPausesTimestamps[next] = now; - - longPausesDurations[next] = pause; - - lastWakeUpTime = now; - } - } - else { - synchronized (this) { - lastWakeUpTime = now; + lastWakeUpTimeNanos = System.nanoTime(); + long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; + long awaitDeadlineMillis = awaitDeadline / 1_000_000L; + int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000); + while (System.nanoTime() <= awaitDeadline) + wait(awaitDeadlineMillis, awaitDeadlineNanos); + long nanoTime = System.nanoTime(); + long pause = nanoTime - awaitDeadline; + long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause); + if (pauseMillis >= THRESHOLD) { + log.warning("Possible too long JVM pause: " + pauseMillis + " ms. " + + "Precision: " + PRECISION + " ms."); + final int next = (int) (longPausesCnt++ % EVT_CNT); + longPausesTotalDurationNanos += pause; + longPausesTimestamps[next] = System.currentTimeMillis(); + longPausesMonotonicTimestamps[next] = nanoTime; + longPausesDurations[next] = pauseMillis; + } } - } - } - catch (InterruptedException e) { - Thread locThread = Thread.currentThread(); + } catch (InterruptedException e) { + Thread locThread = Thread.currentThread(); - if (workerRef.compareAndSet(locThread, null)) - log.error(locThread.getName() + " has been interrupted.", e); - else if (log.isDebugEnabled()) - log.debug(locThread.getName() + " has been stopped."); + if (workerRef.compareAndSet(locThread, null)) + log.error(locThread.getName() + " has been interrupted.", e); + else if (log.isDebugEnabled()) + log.debug(locThread.getName() + " has been stopped."); - break; - } + break; + } } }); @@ -209,14 +206,14 @@ synchronized long longPausesCount() { * @return Long JVM pauses total duration. */ synchronized long longPausesTotalDuration() { - return longPausesTotalDuration; + return TimeUnit.NANOSECONDS.toMillis(longPausesTotalDurationNanos); } /** * @return Last checker's wake up time. */ - public synchronized long getLastWakeUpTime() { - return lastWakeUpTime; + public synchronized long getLastWakeUpTimeNanos() { + return lastWakeUpTimeNanos; } /** @@ -232,16 +229,41 @@ synchronized Map longPauseEvents() { } /** - * @return Pair ({@code last long pause event time}, {@code pause time duration}) or {@code null}, if long pause - * wasn't occurred. + * @return last long pause spotted or -1 otherwise */ - public synchronized @Nullable IgniteBiTuple getLastLongPause() { - int lastPauseIdx = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT); - - if (longPausesTimestamps[lastPauseIdx] == 0) - return null; + public synchronized long getLastLongPause() { + int lastPauseIdx = Math.toIntExact(longPausesCnt % EVT_CNT); + long lastLongPause = longPausesDurations[lastPauseIdx]; + return lastLongPause == 0 ? -1 : lastLongPause; + } - return new IgniteBiTuple<>(longPausesTimestamps[lastPauseIdx], longPausesDurations[lastPauseIdx]); + /** + * @param tracker Check point Tracker. + * @return Tries to explain total pauses spotted during check point process + * since {@link CheckpointMetricsTracker#checkPointStartNanos()} + * due to {@link CheckpointMetricsTracker#checkPointEndNanos()} + * or {@link Optional#empty()} if none was found + */ + public synchronized String getTotalSpottedPausesExplain(CheckpointMetricsTracker tracker) { + int lastPointer = (int) (longPausesCnt % EVT_CNT); + int pausesSpottedTimes = 0; + int curPointer = lastPointer; + List pausesExplains = new LinkedList<>(); + long checkPointStartNanos = tracker.checkPointStartNanos(); + do { + if (longPausesMonotonicTimestamps[curPointer] <= checkPointStartNanos) + break; + pausesExplains.add(String.format( + "%d ms at %d", + longPausesDurations[curPointer], + longPausesTimestamps[curPointer])); + pausesSpottedTimes++; + curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1; + } while (curPointer != lastPointer); + return String.format("Pause detecor spotted %d pauses: %s. Each with precision %d ms", + pausesSpottedTimes, + pausesExplains, + PRECISION); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index bd5c3c9f29811..3d81fc6a032b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -18,10 +18,15 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import java.io.File; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; @@ -32,6 +37,10 @@ import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; + +import com.sun.management.GcInfo; +import com.sun.management.internal.GarbageCollectorExtImpl; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -122,6 +131,9 @@ public class Checkpointer extends GridWorker { /** Timeout between partition file destroy and checkpoint to handle it. */ private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds. + /** Jvm pause explain pattern. */ + private static final String JVM_PAUSE_EXPLAIN_PATTERN = "Possible JVM Pause explaination: [ %s ]"; + /** Avoid the start checkpoint if checkpointer was canceled. */ private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); @@ -465,8 +477,6 @@ private void doCheckpoint() { if (chp.hasDelta()) { if (log.isInfoEnabled()) { - long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker); - log.info( String.format( CHECKPOINT_STARTED_LOG_FORMAT, @@ -480,7 +490,7 @@ private void doCheckpoint() { tracker.splitAndSortCpPagesDuration(), tracker.recoveryDataWriteDuration(), tracker.writeCheckpointEntryDuration(), - possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "", + possibleLongJvmPauseExplaination(tracker), chp.pagesSize, chp.progress.reason() ) @@ -928,27 +938,54 @@ private void waitCheckpointEvent() { /** * @param tracker Checkpoint metrics tracker. - * @return Duration of possible JVM pause, if it was detected, or {@code -1} otherwise. + * @return Explain possible JVM pause. */ - private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) { - if (LongJVMPauseDetector.enabled()) { - if (tracker.lockWaitDuration() + tracker.lockHoldDuration() > longJvmPauseThreshold) { - long now = System.currentTimeMillis(); - - // We must get last wake up time before search possible pause in events map. - long wakeUpTime = pauseDetector.getLastWakeUpTime(); - - IgniteBiTuple lastLongPause = pauseDetector.getLastLongPause(); - - if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1()) - return lastLongPause.get2(); - - if (now - wakeUpTime > longJvmPauseThreshold) - return now - wakeUpTime; - } + private String possibleLongJvmPauseExplaination(CheckpointMetricsTracker tracker) { + long lockDurationMillis = tracker.lockDurationMillis(); + if (LongJVMPauseDetector.enabled() && lockDurationMillis > longJvmPauseThreshold) { + List explains = new LinkedList<>(); + explains.add(String.format("Checkpoint lock took %d ms", lockDurationMillis)); + explains.addAll(getGCExplains(tracker)); + explains.add(pauseDetector.getTotalSpottedPausesExplain(tracker)); + return String.format(JVM_PAUSE_EXPLAIN_PATTERN, String.join("; ", explains)) + ", "; } + return ""; + } - return -1L; + /** + * @param tracker Checkpoint Tracker. + * @return Collection of every last GC if it happened within checkpoint process bounds + */ + private Collection getGCExplains(CheckpointMetricsTracker tracker) { + return ManagementFactory.getGarbageCollectorMXBeans() + .stream() + .filter(GarbageCollectorExtImpl.class::isInstance) + .map(GarbageCollectorExtImpl.class::cast) + .map(mxBean -> getGCExplain(mxBean, tracker)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + /** + * @param mxBean GC Mx bean. + * @param tracker Check point Tracker. + * @return GC explain if it happend within check point process bounds + */ + private Optional getGCExplain(GarbageCollectorExtImpl mxBean, CheckpointMetricsTracker tracker) { + String gcName = mxBean.getName(); + GcInfo lastGcInfo = mxBean.getLastGcInfo(); + RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + long uptimeNanos = runtimeMXBean.getUptime() * 1_000_000L; + long checkPointStartNanos = tracker.checkPointStartNanos(); + long checkPointEndNanos = tracker.checkPointEndNanos(); + long monotonicStartTimeNanos = System.nanoTime() - uptimeNanos; + long monotonicGCStartTimeNanos = monotonicStartTimeNanos + lastGcInfo.getStartTime() * 1_000_000L; + long monotonicGCEndTimeNanos = monotonicStartTimeNanos + lastGcInfo.getEndTime() * 1_000_000L; + if (checkPointStartNanos >= monotonicGCEndTimeNanos || checkPointEndNanos <= monotonicGCStartTimeNanos) + return Optional.empty(); + long duration = lastGcInfo.getDuration(); + return Optional.of(String.format("%s most recent GC took %d ms", gcName, duration)); } /** @@ -995,6 +1032,7 @@ private void startCheckpointProgress() { * * @deprecated Should be rewritten to public API. */ + @Deprecated public IgniteInternalFuture enableCheckpoints(boolean enable) { GridFutureAdapter fut = new GridFutureAdapter<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java index f8ba2d1adcbcf..dc2de3bffeefc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType; @@ -54,7 +55,13 @@ public class CheckpointMetricsTracker { private volatile int cowPages; /** */ - private final long cpStart = System.currentTimeMillis(); + private final long cpStart = System.nanoTime(); + + /** */ + private long cpEnd; + + /** Epoch time of checkpoint process start */ + private final long cpStartEpochTime = System.currentTimeMillis(); /** */ private long cpLockWaitStart; @@ -65,18 +72,12 @@ public class CheckpointMetricsTracker { /** */ private long cpMarkEnd; - /** */ - private long cpLockRelease; - /** */ private long cpPagesWriteStart; /** */ private long cpFsyncStart; - /** */ - private long cpEnd; - /** */ private long walCpRecordFsyncStart; @@ -84,19 +85,52 @@ public class CheckpointMetricsTracker { private long walCpRecordFsyncEnd; /** */ - private long cpMarkerStoreEnd; + private long cpRecoveryDataWriteEnd; /** */ - private long splitAndSortCpPagesEnd; + private long cpRecoveryDataSize; /** */ - private long cpRecoveryDataWriteEnd; + private long totalDuration; /** */ - private long cpRecoveryDataSize; + private long totalDurationNanos; /** */ - private long listenersExecEnd; + private long fsyncDuration; + + /** */ + private long writeCheckpointEntryDuration; + + /** */ + private long recoveryDataWriteDuration; + + /** */ + private long splitAndSortCpPagesDuration; + + /** */ + private long walCpRecordFsyncDuration; + + /** */ + private long pagesWriteDuration; + + /** */ + private long lockDurationMillis; + + /** */ + private long lockHoldDuration; + + /** */ + private long markDuration; + + /** */ + private long listenersExecDuration; + + /** */ + private long beforeLockDuration; + + /** */ + private long lockWaitDuration; /** * Increments counter if copy on write page was written. @@ -126,145 +160,170 @@ public int dataPagesWritten() { /** */ public void onLockWaitStart() { - cpLockWaitStart = System.currentTimeMillis(); + cpLockWaitStart = System.nanoTime(); + beforeLockDuration = TimeUnit.NANOSECONDS.toMillis(cpLockWaitStart - cpStart); } /** */ public void onMarkStart() { - cpMarkStart = System.currentTimeMillis(); + cpMarkStart = System.nanoTime(); + lockWaitDuration = TimeUnit.NANOSECONDS.toMillis(cpMarkStart - cpLockWaitStart); } /** */ public void onMarkEnd() { - cpMarkEnd = System.currentTimeMillis(); + cpMarkEnd = System.nanoTime(); + markDuration = TimeUnit.NANOSECONDS.toMillis(cpMarkEnd - cpMarkStart); } /** */ public void onLockRelease() { - cpLockRelease = System.currentTimeMillis(); + long nanoTime = System.nanoTime(); + lockHoldDuration = TimeUnit.NANOSECONDS.toMillis(nanoTime - cpMarkStart); + lockDurationMillis = TimeUnit.NANOSECONDS.toMillis(nanoTime - cpLockWaitStart); } /** */ public void onPagesWriteStart() { - cpPagesWriteStart = System.currentTimeMillis(); + cpPagesWriteStart = System.nanoTime(); } /** */ public void onFsyncStart() { - cpFsyncStart = System.currentTimeMillis(); + cpFsyncStart = System.nanoTime(); + pagesWriteDuration = TimeUnit.NANOSECONDS.toMillis(cpFsyncStart - cpPagesWriteStart); } /** */ public void onEnd() { - cpEnd = System.currentTimeMillis(); + cpEnd = System.nanoTime(); + totalDurationNanos = cpEnd - cpStart; + totalDuration = TimeUnit.NANOSECONDS.toMillis(totalDurationNanos); + fsyncDuration = TimeUnit.NANOSECONDS.toMillis(cpEnd - cpFsyncStart); } /** */ public void onListenersExecuteEnd() { - listenersExecEnd = System.currentTimeMillis(); + listenersExecDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - cpMarkStart); } /** */ public void onWalCpRecordFsyncStart() { - walCpRecordFsyncStart = System.currentTimeMillis(); + walCpRecordFsyncStart = System.nanoTime(); } /** */ public void onCpMarkerStoreEnd() { - cpMarkerStoreEnd = System.currentTimeMillis(); + writeCheckpointEntryDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - cpRecoveryDataWriteEnd); } /** */ public void onSplitAndSortCpPagesEnd() { - splitAndSortCpPagesEnd = System.currentTimeMillis(); + splitAndSortCpPagesDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - walCpRecordFsyncEnd); } /** */ public void onWalCpRecordFsyncEnd() { - walCpRecordFsyncEnd = System.currentTimeMillis(); + walCpRecordFsyncEnd = System.nanoTime(); + walCpRecordFsyncDuration = TimeUnit.NANOSECONDS.toMillis(walCpRecordFsyncEnd - walCpRecordFsyncStart); } /** */ public void onWriteRecoveryDataEnd(long recoveryDataSize) { cpRecoveryDataSize = recoveryDataSize; - cpRecoveryDataWriteEnd = System.currentTimeMillis(); + cpRecoveryDataWriteEnd = System.nanoTime(); + recoveryDataWriteDuration = TimeUnit.NANOSECONDS.toMillis(cpRecoveryDataWriteEnd - cpMarkEnd); } /** * @return Total checkpoint duration. */ public long totalDuration() { - return cpEnd - cpStart; + return totalDuration; + } + + /** + * @return Total checkpoint duration (nanos). + */ + public long totalDurationNanos() { + return totalDurationNanos; } /** * @return Checkpoint lock wait duration. */ public long lockWaitDuration() { - return cpMarkStart - cpLockWaitStart; + return lockWaitDuration; } /** * @return Checkpoint action before taken write lock duration. */ public long beforeLockDuration() { - return cpLockWaitStart - cpStart; + return beforeLockDuration; } /** * @return Execution listeners under write lock duration. */ public long listenersExecuteDuration() { - return listenersExecEnd - cpMarkStart; + return listenersExecDuration; } /** * @return Checkpoint mark duration. */ public long markDuration() { - return cpMarkEnd - cpMarkStart; + return markDuration; } /** * @return Checkpoint lock hold duration. */ public long lockHoldDuration() { - return cpLockRelease - cpMarkStart; + return lockHoldDuration; + } + + /** + * @return Checkpoint lock duration + */ + public long lockDurationMillis() { + return lockDurationMillis; } /** * @return Pages write duration. */ public long pagesWriteDuration() { - return cpFsyncStart - cpPagesWriteStart; + return pagesWriteDuration; } /** * @return Checkpoint fsync duration. */ public long fsyncDuration() { - return cpEnd - cpFsyncStart; + return fsyncDuration; } /** * @return Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. */ public long walCpRecordFsyncDuration() { - return walCpRecordFsyncEnd - walCpRecordFsyncStart; + return walCpRecordFsyncDuration; } /** * @return Duration of splitting and sorting checkpoint pages. */ public long splitAndSortCpPagesDuration() { - return splitAndSortCpPagesEnd - walCpRecordFsyncEnd; + return splitAndSortCpPagesDuration; } /** * @return Duration of writing recovery data. */ public long recoveryDataWriteDuration() { - return cpRecoveryDataWriteEnd - cpMarkEnd; + return recoveryDataWriteDuration; } /** @@ -280,13 +339,27 @@ public long recoveryDataSize() { * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) */ public long writeCheckpointEntryDuration() { - return cpMarkerStoreEnd - cpRecoveryDataWriteEnd; + return writeCheckpointEntryDuration; } /** * @return Checkpoint start time. */ public long checkpointStartTime() { + return cpStartEpochTime; + } + + /** + * @return monotonic time in nanos at check point process start + */ + public long checkPointStartNanos() { return cpStart; } + + /** + * @return monotonic time in nanos at check point process end + */ + public long checkPointEndNanos() { + return cpEnd; + } } From b82ed0f749007e93452cbb64a4d3cfba53fc9003 Mon Sep 17 00:00:00 2001 From: Egor Baranov Date: Mon, 1 Jun 2026 14:37:55 +0300 Subject: [PATCH 2/4] IGNITE-27232 Added enum for duration entries, optimized duration calculation --- .../ignite/internal/LongJVMPauseDetector.java | 76 +-- .../checkpoint/CheckpointMetricsTracker.java | 625 ++++++++++++++++++ .../checkpoint/CheckpointPagesWriter.java | 1 - .../CheckpointPagesWriterFactory.java | 1 - .../checkpoint/CheckpointWorkflow.java | 1 - .../persistence/checkpoint/Checkpointer.java | 251 +------ .../pagemem/CheckpointMetricsTracker.java | 365 ---------- .../persistence/pagemem/PageMemoryEx.java | 1 + .../persistence/pagemem/PageMemoryImpl.java | 1 + .../pagemem/PageMemoryImplTest.java | 1 + 10 files changed, 668 insertions(+), 655 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java index 814e3fdca4a56..9b2467ce86161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -26,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -92,7 +90,7 @@ public class LongJVMPauseDetector { private long longPausesTotalDurationNanos; /** Last detector's wake up time. */ - private long lastWakeUpTimeNanos = System.nanoTime(); + private long lastWakeUpTimeNanos = getMonotonicTimeNanos(); /** Long pauses timestamps. */ @GridToStringInclude @@ -132,13 +130,13 @@ public void start() { try { // don't worry, wait will release monitor and all props will be accessible synchronized (this) { - lastWakeUpTimeNanos = System.nanoTime(); + lastWakeUpTimeNanos = getMonotonicTimeNanos(); long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; long awaitDeadlineMillis = awaitDeadline / 1_000_000L; int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000); - while (System.nanoTime() <= awaitDeadline) + while (getMonotonicTimeNanos() <= awaitDeadline) wait(awaitDeadlineMillis, awaitDeadlineNanos); - long nanoTime = System.nanoTime(); + long nanoTime = getMonotonicTimeNanos(); long pause = nanoTime - awaitDeadline; long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause); if (pauseMillis >= THRESHOLD) { @@ -209,13 +207,6 @@ synchronized long longPausesTotalDuration() { return TimeUnit.NANOSECONDS.toMillis(longPausesTotalDurationNanos); } - /** - * @return Last checker's wake up time. - */ - public synchronized long getLastWakeUpTimeNanos() { - return lastWakeUpTimeNanos; - } - /** * @return Last long JVM pause events. */ @@ -229,45 +220,42 @@ synchronized Map longPauseEvents() { } /** - * @return last long pause spotted or -1 otherwise - */ - public synchronized long getLastLongPause() { - int lastPauseIdx = Math.toIntExact(longPausesCnt % EVT_CNT); - long lastLongPause = longPausesDurations[lastPauseIdx]; - return lastLongPause == 0 ? -1 : lastLongPause; - } - - /** - * @param tracker Check point Tracker. + * @param cpStart Check point start time in nanos. * @return Tries to explain total pauses spotted during check point process - * since {@link CheckpointMetricsTracker#checkPointStartNanos()} - * due to {@link CheckpointMetricsTracker#checkPointEndNanos()} - * or {@link Optional#empty()} if none was found + * or {@link Optional#empty()} if none were found */ - public synchronized String getTotalSpottedPausesExplain(CheckpointMetricsTracker tracker) { + public Optional getTotalSpottedPausesExplain(long cpStart) { int lastPointer = (int) (longPausesCnt % EVT_CNT); int pausesSpottedTimes = 0; int curPointer = lastPointer; - List pausesExplains = new LinkedList<>(); - long checkPointStartNanos = tracker.checkPointStartNanos(); - do { - if (longPausesMonotonicTimestamps[curPointer] <= checkPointStartNanos) - break; - pausesExplains.add(String.format( - "%d ms at %d", - longPausesDurations[curPointer], - longPausesTimestamps[curPointer])); - pausesSpottedTimes++; - curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1; - } while (curPointer != lastPointer); - return String.format("Pause detecor spotted %d pauses: %s. Each with precision %d ms", - pausesSpottedTimes, - pausesExplains, - PRECISION); + StringBuilder explainBuilder = new StringBuilder(); + synchronized (this) { + do { + if (longPausesMonotonicTimestamps[curPointer] <= cpStart) + break; + explainBuilder.append(longPausesDurations[curPointer]).append(" ms at ") + .append(longPausesTimestamps[curPointer]).append(";"); + pausesSpottedTimes++; + curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1; + } while (curPointer != lastPointer); + } + return pausesSpottedTimes == 0 ? + Optional.empty() : + Optional.of(String.format("Pause detecor spotted %d pauses: [%s]. Each with precision %d ms", + pausesSpottedTimes, + explainBuilder, + PRECISION)); } /** {@inheritDoc} */ @Override public String toString() { return S.toString(LongJVMPauseDetector.class, this); } + + /** + * @return monotonic time in nanos + */ + protected static long getMonotonicTimeNanos() { + return System.nanoTime(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java new file mode 100644 index 0000000000000..9c2036a157ea3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.checkpoint; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.Collection; +import java.util.EnumMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; +import java.util.stream.Collectors; +import com.sun.management.GcInfo; +import com.sun.management.internal.GarbageCollectorExtImpl; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.LongJVMPauseDetector; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteCommonsSystemProperties.getInteger; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD; +import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD; + +/** + * Tracks various checkpoint phases and stats. + * + * Assumed sequence of events: + *
    + *
  1. Checkpoint start
  2. + *
  3. CP Lock wait start
  4. + *
  5. CP mark start
  6. + *
  7. CP Lock release
  8. + *
  9. Pages write start
  10. + *
  11. fsync start
  12. + *
  13. Checkpoint end
  14. + *
+ */ +public class CheckpointMetricsTracker { + /** Jvm pause explain pattern. */ + private static final String JVM_PAUSE_EXPLAIN_PATTERN = "Possible JVM Pause explaination: [ %s ]"; + + /** Checkpoint started log message format. */ + private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" + + "checkpointId=%s, " + + "startPtr=%s, " + + "checkpointBeforeLockTime=%dms, " + + "checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, " + + "checkpointLockHoldTime=%dms, " + + "walCpRecordFsyncDuration=%dms, " + + "splitAndSortCpPagesDuration=%dms, " + + "writeRecoveryDataDuration=%dms, " + + "writeCheckpointEntryDuration=%dms, " + + "%s" + + "pages=%d, " + + "reason='%s']"; + + /** Executor service to provide calculations and metrics export */ + public static final ExecutorService CHECK_POINT_METRICS_EXPORT_EXECUTOR = + Executors.newSingleThreadExecutor((runnable) -> { + Thread thread = new Thread(runnable); + try { + thread.setDaemon(true); + thread.setName("check-point-metrics-export-thread"); + } catch (SecurityException ignored) { + // do nothing + } + return thread; + }); + + static { + Runtime runtime = Runtime.getRuntime(); + Thread shutdownExecutor = new Thread(() -> { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.shutdown(); + try { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + runtime.addShutdownHook(shutdownExecutor); + } + + /** */ + private static final AtomicIntegerFieldUpdater DATA_PAGES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "dataPages"); + + /** */ + private static final AtomicIntegerFieldUpdater COW_PAGES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages"); + + + /** Long JVM pause threshold. */ + private final int longJvmPauseThreshold = + getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); + + /** Ignite logger. */ + private final IgniteLogger log; + + /** Previously calculated duration storage. */ + private final EnumMap durationsStorage = new EnumMap<>(Duration.class); + + /** Pause detector. */ + private final LongJVMPauseDetector pauseDetector; + + /** */ + private volatile int dataPages; + + /** */ + private volatile int cowPages; + + /** */ + private final long cpStart = System.nanoTime(); + + /** Epoch time of checkpoint process start */ + private final long cpStartEpochTime = System.currentTimeMillis(); + + /** */ + private long cpLockWaitStart; + + /** */ + private long cpMarkStart; + + /** */ + private long cpMarkEnd; + + /** */ + private long cpLockRelease; + + /** */ + private long cpPagesWriteStart; + + /** */ + private long cpFsyncStart; + + /** */ + private long cpEnd; + + /** */ + private long walCpRecordFsyncStart; + + /** */ + private long walCpRecordFsyncEnd; + + /** */ + private long cpMarkerStoreEnd; + + /** */ + private long splitAndSortCpPagesEnd; + + /** */ + private long cpRecoveryDataWriteEnd; + + /** */ + private long cpRecoveryDataSize; + + /** */ + private long listenersExecEnd; + + /** + * @param log - current ignite logger instance + */ + public CheckpointMetricsTracker(IgniteLogger log, LongJVMPauseDetector pauseDetector) { + this.log = log; + this.pauseDetector = pauseDetector; + } + + /** + * Increments counter if copy on write page was written. + */ + public void onCowPageWritten() { + COW_PAGES_UPDATER.incrementAndGet(this); + } + + /** */ + public void onDataPageWritten() { + DATA_PAGES_UPDATER.incrementAndGet(this); + } + + /** + * @return COW pages. + */ + public int cowPagesWritten() { + return cowPages; + } + + /** + * @return Data pages written. + */ + public int dataPagesWritten() { + return dataPages; + } + + /** */ + public void onLockWaitStart() { + cpLockWaitStart = System.nanoTime(); + } + + /** */ + public void onMarkStart() { + cpMarkStart = System.nanoTime(); + } + + /** */ + public void onMarkEnd() { + cpMarkEnd = System.nanoTime(); + } + + /** */ + public void onLockRelease() { + cpLockRelease = System.nanoTime(); + } + + /** */ + public void onPagesWriteStart() { + cpPagesWriteStart = System.nanoTime(); + } + + /** */ + public void onFsyncStart() { + cpFsyncStart = System.nanoTime(); + } + + /** */ + public void onEnd() { + cpEnd = System.nanoTime(); + } + + /** */ + public void onListenersExecuteEnd() { + listenersExecEnd = System.nanoTime(); + } + + /** */ + public void onWalCpRecordFsyncStart() { + walCpRecordFsyncStart = System.nanoTime(); + } + + /** */ + public void onCpMarkerStoreEnd() { + cpMarkerStoreEnd = System.nanoTime(); + } + + /** */ + public void onSplitAndSortCpPagesEnd() { + splitAndSortCpPagesEnd = System.nanoTime(); + } + + /** */ + public void onWalCpRecordFsyncEnd() { + walCpRecordFsyncEnd = System.nanoTime(); + } + + /** */ + public void onWriteRecoveryDataEnd(long recoveryDataSize) { + cpRecoveryDataSize = recoveryDataSize; + cpRecoveryDataWriteEnd = System.nanoTime(); + } + + /** + * Tries to log message with checkpoint recovery data write start details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckPointRecoveryDataWriteStart(Checkpoint chp) { + int pagesSize = chp.pagesSize; + Object checkpointId = chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(); + Object checkpointMark = chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(); + String recoveryDataWriteStartReason = chp.progress.reason(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info(String.format("Checkpoint recovery data write started [" + + "checkpointId=%s, " + + "startPtr=%s, " + + "pages=%d, " + + "checkpointBeforeLockTime=%dms, " + + "checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, " + + "checkpointLockHoldTime=%dms, " + + "walCpRecordFsyncDuration=%dms, " + + "splitAndSortCpPagesDuration=%dms, " + + "reason='%s']", + checkpointId, + checkpointMark, + pagesSize, + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + recoveryDataWriteStartReason + )); + } + }); + } + + /** + * Tries to log message with checkpoint start details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckPointStart(Checkpoint chp) { + int pagesSize = chp.pagesSize; + String checkPointStartReason = chp.progress.reason(); + Object checkpointId = chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(); + Object checkpointMark = chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info( + String.format( + CHECKPOINT_STARTED_LOG_FORMAT, + checkpointId, + checkpointMark, + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + possibleLongJvmPauseExplaination(), + pagesSize, + checkPointStartReason)); + } + }); + } + + /** + * Tries to log message with checkpoint finish details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckpointFinish(Checkpoint chp) { + int pagesSize = chp.pagesSize; + Object checkpointId = chp.cpEntry != null ? chp.cpEntry.checkpointId() : ""; + Object checkpointMark = chp.cpEntry != null ? chp.cpEntry.checkpointMark() : ""; + IgniteBiTuple walSegsCoveredRange = chp.walSegsCoveredRange; + invokeLater(() -> { + if (log.isInfoEnabled()) { + log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + + "walSegmentsCovered=%s, markDuration=%dms, recoveryWrite=%dms, pagesWrite=%dms, " + + "fsync=%dms, total=%dms]", + checkpointId, + pagesSize, + checkpointMark, + walRangeStr(walSegsCoveredRange), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.TOTAL))); + } + }); + } + + /** + * Tries to log message with checkpoint skip details + * @param chp Checkpoint (This metrics owner) + */ + void logCheckpointSkip(Checkpoint chp) { + String skipReason = chp.progress.reason(); + invokeLater(() -> { + if (log.isInfoEnabled()) { + LT.info(log, String.format( + "Skipping checkpoint (no pages were modified) [" + + "checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " + + "checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']", + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.LOCK_HOLD), + skipReason) + ); + } + }); + } + + /** + * Updates performance statistics if feature is enabled + * @param chp Checkpoint (This metrics owner) + * @param psproc Performance statistics processor. + */ + void updatePerformanceStatistics(Checkpoint chp, PerformanceStatisticsProcessor psproc) { + int pagesSize = chp.pagesSize; + int dataPagesWritten = dataPagesWritten(); + int cowPagesWritten = cowPagesWritten(); + invokeLater(() -> { + if (psproc.enabled()) { + psproc.checkpoint( + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.TOTAL), + cpStartEpochTime, + pagesSize, + dataPagesWritten, + cowPagesWritten); + } + }); + } + + /** + * Persists metrics of Checkpoint if feature is enabled + * @param chp Checkpoint (This metrics owner) + * @param persStoreMetrics Metrics persistence storage. + * @param cacheProc Cache processor. + */ + void storeMetrics(Checkpoint chp, DataStorageMetricsImpl persStoreMetrics, GridCacheProcessor cacheProc) { + boolean metricsEnabled = persStoreMetrics.metricsEnabled(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager) cacheProc.context().database(); + int pagesSize = chp.pagesSize; + long pageStoresSize = dbMgr.forAllPageStores(PageStore::size); + long pageStoresSparseSize = dbMgr.forAllPageStores(PageStore::getSparseSize); + int cowPagesWritten = cowPagesWritten(); + int dataPagesWritten = dataPagesWritten(); + invokeLater(() -> { + if (metricsEnabled) { + persStoreMetrics.onCheckpoint( + getDurationMillis(Duration.BEFORE_LOCK), + getDurationMillis(Duration.LOCK_WAIT), + getDurationMillis(Duration.LISTENERS_EXECUTE), + getDurationMillis(Duration.MARK), + getDurationMillis(Duration.LOCK_HOLD), + getDurationMillis(Duration.PAGES_WRITE), + getDurationMillis(Duration.FSYNC), + getDurationMillis(Duration.WAL_CP_RECORD_FSYNC), + getDurationMillis(Duration.WRITE_CHECKPOINT_ENTRY), + getDurationMillis(Duration.SPLIT_AND_SORT_CP_PAGES), + getDurationMillis(Duration.RECOVERY_DATA_WRITE), + getDurationMillis(Duration.TOTAL), + cpStartEpochTime, + pagesSize, + dataPagesWritten, + cowPagesWritten, + cpRecoveryDataSize, + pageStoresSize, + pageStoresSparseSize + ); + } + }); + } + + /** + * Creates a string of a range WAL segments. + * + * @param walRange Range of WAL segments. + * @return The message about how many WAL segments was between previous checkpoint and current one. + */ + private String walRangeStr(@Nullable IgniteBiTuple walRange) { + if (walRange == null) + return ""; + + String res; + + long startIdx = walRange.get1(); + long endIdx = walRange.get2(); + + if (endIdx < 0 || endIdx < startIdx) + res = "[]"; + else if (endIdx == startIdx) + res = "[" + endIdx + "]"; + else + res = "[" + startIdx + " - " + endIdx + "]"; + + return res; + } + + + /** + * @return Explain possible JVM pause. + */ + private String possibleLongJvmPauseExplaination() { + long lockDuration = getDurationMillis(Duration.TOTAL_LOCK); + if (LongJVMPauseDetector.enabled() && lockDuration > longJvmPauseThreshold) { + List explains = new LinkedList<>(); + explains.add(String.format("Checkpoint lock took %d ms", lockDuration)); + explains.addAll(getGCExplains()); + pauseDetector.getTotalSpottedPausesExplain(cpStart).ifPresent(explains::add); + return String.format(JVM_PAUSE_EXPLAIN_PATTERN, String.join("; ", explains)) + ", "; + } + return ""; + } + + /** + * @return Collection of every last GC if it happened within checkpoint process bounds + */ + private Collection getGCExplains() { + return ManagementFactory.getGarbageCollectorMXBeans() + .stream() + .filter(GarbageCollectorExtImpl.class::isInstance) + .map(GarbageCollectorExtImpl.class::cast) + .map(this::getGCExplain) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + /** + * @param mxBean GC Mx bean. + * @return GC explain if it happend within check point process bounds + */ + private Optional getGCExplain(GarbageCollectorExtImpl mxBean) { + String gcName = mxBean.getName(); + GcInfo lastGcInfo = mxBean.getLastGcInfo(); + RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + long uptimeNanos = runtimeMXBean.getUptime() * 1_000_000L; + long monotonicStartTimeNanos = System.nanoTime() - uptimeNanos; + long monotonicGCStartTimeNanos = monotonicStartTimeNanos + lastGcInfo.getStartTime() * 1_000_000L; + long monotonicGCEndTimeNanos = monotonicStartTimeNanos + lastGcInfo.getEndTime() * 1_000_000L; + if (cpStart >= monotonicGCEndTimeNanos || cpEnd <= monotonicGCStartTimeNanos) + return Optional.empty(); + long duration = lastGcInfo.getDuration(); + return Optional.of(String.format("%s most recent GC took %d ms", gcName, duration)); + } + + /** + * Tries to execute task, will not throw {@link java.util.concurrent.RejectedExecutionException} + * @param runnable Runnable. + */ + private void invokeLater(Runnable runnable) { + try { + CHECK_POINT_METRICS_EXPORT_EXECUTOR.submit(runnable); + } catch (RejectedExecutionException ignored) { + // do nothing, looks like it is the end + } + } + + /** + * Tries to get previously calculated result + * or calculates duration + * @param durationEntry Duration entry. + */ + private long getDurationMillis(Duration durationEntry) { + return durationsStorage.computeIfAbsent(durationEntry, duration -> duration.getMillis(this)); + } + + /** Enum contains all tracked durations, it's calculation and comment */ + private enum Duration { + /** Total checkpoint duration. */ + TOTAL(tracker -> tracker.cpEnd - tracker.cpStart), + /** Since start due to write lock acquisition */ + BEFORE_LOCK(tracker -> tracker.cpLockWaitStart - tracker.cpStart), + /** Checkpoint lock wait duration. */ + LOCK_WAIT(tracker -> tracker.cpMarkStart - tracker.cpLockWaitStart), + /** Checkpoint lock hold duration */ + LOCK_HOLD(tracker -> tracker.cpLockRelease - tracker.cpMarkStart), + /** Lock wait and hold total duration */ + TOTAL_LOCK(tracker -> tracker.cpLockRelease - tracker.cpLockWaitStart), + /** Fire listeners under write lock duration */ + LISTENERS_EXECUTE(tracker -> tracker.listenersExecEnd - tracker.cpMarkStart), + /** Checkpoint mark duration */ + MARK(tracker -> tracker.cpMarkEnd - tracker.cpMarkStart), + /** Pages write duration */ + PAGES_WRITE(tracker -> tracker.cpFsyncStart - tracker.cpPagesWriteStart), + /** Checkpoint fsync duration */ + FSYNC(tracker -> tracker.cpEnd - tracker.cpFsyncStart), + /** Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. */ + WAL_CP_RECORD_FSYNC(tracker -> tracker.walCpRecordFsyncEnd - tracker.walCpRecordFsyncStart), + /** Duration of splitting and sorting checkpoint pages */ + SPLIT_AND_SORT_CP_PAGES(tracker -> tracker.splitAndSortCpPagesEnd - tracker.walCpRecordFsyncEnd), + /** Duration of writing recovery data */ + RECOVERY_DATA_WRITE(tracker -> tracker.cpRecoveryDataWriteEnd - tracker.cpMarkEnd), + /** + * Duration of checkpoint entry buffer writing to file. + * + * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) + */ + WRITE_CHECKPOINT_ENTRY(tracker -> tracker.cpMarkerStoreEnd - tracker.cpRecoveryDataWriteEnd); + + /** Duration nano time calculation function */ + private final Function durationNanosFunction; + + /** + * @param durationNanosFunction Duration nano time calculation function. + */ + Duration(Function durationNanosFunction) { + this.durationNanosFunction = durationNanosFunction; + } + + /** + * @param tracker Tracker. + */ + private Long getMillis(CheckpointMetricsTracker tracker) { + Long nanos = durationNanosFunction.apply(tracker); + return TimeUnit.NANOSECONDS.toMillis(nanos); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java index 3a508ca679558..f81566234aa21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriter.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java index a261823a69c82..69f663673a60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java index 47480dace142a..6f7ed7bb61cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointWorkflow.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.StorageException; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index 3d81fc6a032b2..406e438a49a74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -18,15 +18,10 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; import java.io.File; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; @@ -37,10 +32,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import com.sun.management.GcInfo; -import com.sun.management.internal.GarbageCollectorExtImpl; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -57,10 +49,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.failure.FailureProcessor; @@ -70,22 +60,17 @@ import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.WorkProgressDispatcher; import org.apache.ignite.internal.worker.WorkersRegistry; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.getBoolean; -import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; -import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD; import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP; @@ -109,38 +94,15 @@ */ @SuppressWarnings("NakedNotify") public class Checkpointer extends GridWorker { - /** Checkpoint started log message format. */ - private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" + - "checkpointId=%s, " + - "startPtr=%s, " + - "checkpointBeforeLockTime=%dms, " + - "checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, " + - "checkpointLockHoldTime=%dms, " + - "walCpRecordFsyncDuration=%dms, " + - "splitAndSortCpPagesDuration=%dms, " + - "writeRecoveryDataDuration=%dms, " + - "writeCheckpointEntryDuration=%dms, " + - "%s" + - "pages=%d, " + - "reason='%s']"; - /** Skip sync. */ private final boolean skipSync = getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC); /** Timeout between partition file destroy and checkpoint to handle it. */ private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds. - /** Jvm pause explain pattern. */ - private static final String JVM_PAUSE_EXPLAIN_PATTERN = "Possible JVM Pause explaination: [ %s ]"; - /** Avoid the start checkpoint if checkpointer was canceled. */ private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); - /** Long JVM pause threshold. */ - private final int longJvmPauseThreshold = - getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); - /** Pause detector. */ private final LongJVMPauseDetector pauseDetector; @@ -407,7 +369,7 @@ private void doCheckpoint() { Checkpoint chp = null; try { - CheckpointMetricsTracker tracker = new CheckpointMetricsTracker(); + CheckpointMetricsTracker tracker = new CheckpointMetricsTracker(log, pauseDetector); startCheckpointProgress(); @@ -426,31 +388,7 @@ private void doCheckpoint() { checkpointRecoveryFileStorage.clear(); if (writeRecoveryData) { - if (log.isInfoEnabled()) { - log.info(String.format("Checkpoint recovery data write started [" + - "checkpointId=%s, " + - "startPtr=%s, " + - "pages=%d, " + - "checkpointBeforeLockTime=%dms, " + - "checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, " + - "checkpointLockHoldTime=%dms, " + - "walCpRecordFsyncDuration=%dms, " + - "splitAndSortCpPagesDuration=%dms, " + - "reason='%s']", - chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), - chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), - chp.pagesSize, - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.splitAndSortCpPagesDuration(), - chp.progress.reason() - )); - } - + tracker.logCheckPointRecoveryDataWriteStart(chp); recoveryDataSize = writeRecoveryData(chp); } } @@ -476,43 +414,12 @@ private void doCheckpoint() { updateHeartbeat(); if (chp.hasDelta()) { - if (log.isInfoEnabled()) { - log.info( - String.format( - CHECKPOINT_STARTED_LOG_FORMAT, - chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(), - chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(), - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.writeCheckpointEntryDuration(), - possibleLongJvmPauseExplaination(tracker), - chp.pagesSize, - chp.progress.reason() - ) - ); - } - + tracker.logCheckPointStart(chp); if (!writePages(tracker, chp.cpPages, chp.progress, this, this::isShutdownNow)) return; } else { - if (log.isInfoEnabled()) - LT.info(log, String.format( - "Skipping checkpoint (no pages were modified) [" + - "checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " + - "checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']", - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.lockHoldDuration(), - chp.progress.reason()) - ); - + tracker.logCheckpointSkip(chp); tracker.onPagesWriteStart(); tracker.onFsyncStart(); } @@ -524,24 +431,11 @@ private void doCheckpoint() { tracker.onEnd(); - if (chp.hasDelta() || destroyedPartitionsCnt > 0) { - if (log.isInfoEnabled()) { - log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + - "walSegmentsCovered=%s, markDuration=%dms, recoveryWrite=%dms, pagesWrite=%dms, " + - "fsync=%dms, total=%dms]", - chp.cpEntry != null ? chp.cpEntry.checkpointId() : "", - chp.pagesSize, - chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "", - walRangeStr(chp.walSegsCoveredRange), - tracker.markDuration(), - tracker.recoveryDataWriteDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration())); - } - } + if (chp.hasDelta() || destroyedPartitionsCnt > 0) + tracker.logCheckpointFinish(chp); - updateMetrics(chp, tracker); + tracker.updatePerformanceStatistics(chp, psproc); + tracker.storeMetrics(chp, persStoreMetrics, cacheProcessor); } catch (IgniteCheckedException e) { chp.progress.fail(e); @@ -696,83 +590,6 @@ boolean writePages( return true; } - /** - * @param chp Checkpoint. - * @param tracker Tracker. - */ - private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) { - if (psproc.enabled()) { - psproc.checkpoint( - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.markDuration(), - tracker.lockHoldDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.writeCheckpointEntryDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.totalDuration(), - tracker.checkpointStartTime(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten()); - } - - if (persStoreMetrics.metricsEnabled()) { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cacheProcessor.context().database(); - - persStoreMetrics.onCheckpoint( - tracker.beforeLockDuration(), - tracker.lockWaitDuration(), - tracker.listenersExecuteDuration(), - tracker.markDuration(), - tracker.lockHoldDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.walCpRecordFsyncDuration(), - tracker.writeCheckpointEntryDuration(), - tracker.splitAndSortCpPagesDuration(), - tracker.recoveryDataWriteDuration(), - tracker.totalDuration(), - tracker.checkpointStartTime(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten(), - tracker.recoveryDataSize(), - dbMgr.forAllPageStores(PageStore::size), - dbMgr.forAllPageStores(PageStore::getSparseSize) - ); - } - } - - /** - * Creates a string of a range WAL segments. - * - * @param walRange Range of WAL segments. - * @return The message about how many WAL segments was between previous checkpoint and current one. - */ - private String walRangeStr(@Nullable IgniteBiTuple walRange) { - if (walRange == null) - return ""; - - String res; - - long startIdx = walRange.get1(); - long endIdx = walRange.get2(); - - if (endIdx < 0 || endIdx < startIdx) - res = "[]"; - else if (endIdx == startIdx) - res = "[" + endIdx + "]"; - else - res = "[" + startIdx + " - " + endIdx + "]"; - - return res; - } - /** * Processes all evicted partitions scheduled for destroy. * @@ -936,58 +753,6 @@ private void waitCheckpointEvent() { } } - /** - * @param tracker Checkpoint metrics tracker. - * @return Explain possible JVM pause. - */ - private String possibleLongJvmPauseExplaination(CheckpointMetricsTracker tracker) { - long lockDurationMillis = tracker.lockDurationMillis(); - if (LongJVMPauseDetector.enabled() && lockDurationMillis > longJvmPauseThreshold) { - List explains = new LinkedList<>(); - explains.add(String.format("Checkpoint lock took %d ms", lockDurationMillis)); - explains.addAll(getGCExplains(tracker)); - explains.add(pauseDetector.getTotalSpottedPausesExplain(tracker)); - return String.format(JVM_PAUSE_EXPLAIN_PATTERN, String.join("; ", explains)) + ", "; - } - return ""; - } - - /** - * @param tracker Checkpoint Tracker. - * @return Collection of every last GC if it happened within checkpoint process bounds - */ - private Collection getGCExplains(CheckpointMetricsTracker tracker) { - return ManagementFactory.getGarbageCollectorMXBeans() - .stream() - .filter(GarbageCollectorExtImpl.class::isInstance) - .map(GarbageCollectorExtImpl.class::cast) - .map(mxBean -> getGCExplain(mxBean, tracker)) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - } - - /** - * @param mxBean GC Mx bean. - * @param tracker Check point Tracker. - * @return GC explain if it happend within check point process bounds - */ - private Optional getGCExplain(GarbageCollectorExtImpl mxBean, CheckpointMetricsTracker tracker) { - String gcName = mxBean.getName(); - GcInfo lastGcInfo = mxBean.getLastGcInfo(); - RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); - long uptimeNanos = runtimeMXBean.getUptime() * 1_000_000L; - long checkPointStartNanos = tracker.checkPointStartNanos(); - long checkPointEndNanos = tracker.checkPointEndNanos(); - long monotonicStartTimeNanos = System.nanoTime() - uptimeNanos; - long monotonicGCStartTimeNanos = monotonicStartTimeNanos + lastGcInfo.getStartTime() * 1_000_000L; - long monotonicGCEndTimeNanos = monotonicStartTimeNanos + lastGcInfo.getEndTime() * 1_000_000L; - if (checkPointStartNanos >= monotonicGCEndTimeNanos || checkPointEndNanos <= monotonicGCStartTimeNanos) - return Optional.empty(); - long duration = lastGcInfo.getDuration(); - return Optional.of(String.format("%s most recent GC took %d ms", gcName, duration)); - } - /** * Update the current checkpoint info from the scheduled one. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java deleted file mode 100644 index dc2de3bffeefc..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointMetricsTracker.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.pagemem; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; -import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType; -import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage; -import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; - -/** - * Tracks various checkpoint phases and stats. - * - * Assumed sequence of events: - *
    - *
  1. Checkpoint start
  2. - *
  3. CP Lock wait start
  4. - *
  5. CP mark start
  6. - *
  7. CP Lock release
  8. - *
  9. Pages write start
  10. - *
  11. fsync start
  12. - *
  13. Checkpoint end
  14. - *
- */ -public class CheckpointMetricsTracker { - /** */ - private static final AtomicIntegerFieldUpdater DATA_PAGES_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "dataPages"); - - /** */ - private static final AtomicIntegerFieldUpdater COW_PAGES_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages"); - - /** */ - private volatile int dataPages; - - /** */ - private volatile int cowPages; - - /** */ - private final long cpStart = System.nanoTime(); - - /** */ - private long cpEnd; - - /** Epoch time of checkpoint process start */ - private final long cpStartEpochTime = System.currentTimeMillis(); - - /** */ - private long cpLockWaitStart; - - /** */ - private long cpMarkStart; - - /** */ - private long cpMarkEnd; - - /** */ - private long cpPagesWriteStart; - - /** */ - private long cpFsyncStart; - - /** */ - private long walCpRecordFsyncStart; - - /** */ - private long walCpRecordFsyncEnd; - - /** */ - private long cpRecoveryDataWriteEnd; - - /** */ - private long cpRecoveryDataSize; - - /** */ - private long totalDuration; - - /** */ - private long totalDurationNanos; - - /** */ - private long fsyncDuration; - - /** */ - private long writeCheckpointEntryDuration; - - /** */ - private long recoveryDataWriteDuration; - - /** */ - private long splitAndSortCpPagesDuration; - - /** */ - private long walCpRecordFsyncDuration; - - /** */ - private long pagesWriteDuration; - - /** */ - private long lockDurationMillis; - - /** */ - private long lockHoldDuration; - - /** */ - private long markDuration; - - /** */ - private long listenersExecDuration; - - /** */ - private long beforeLockDuration; - - /** */ - private long lockWaitDuration; - - /** - * Increments counter if copy on write page was written. - */ - public void onCowPageWritten() { - COW_PAGES_UPDATER.incrementAndGet(this); - } - - /** */ - public void onDataPageWritten() { - DATA_PAGES_UPDATER.incrementAndGet(this); - } - - /** - * @return COW pages. - */ - public int cowPagesWritten() { - return cowPages; - } - - /** - * @return Data pages written. - */ - public int dataPagesWritten() { - return dataPages; - } - - /** */ - public void onLockWaitStart() { - cpLockWaitStart = System.nanoTime(); - beforeLockDuration = TimeUnit.NANOSECONDS.toMillis(cpLockWaitStart - cpStart); - } - - /** */ - public void onMarkStart() { - cpMarkStart = System.nanoTime(); - lockWaitDuration = TimeUnit.NANOSECONDS.toMillis(cpMarkStart - cpLockWaitStart); - } - - /** */ - public void onMarkEnd() { - cpMarkEnd = System.nanoTime(); - markDuration = TimeUnit.NANOSECONDS.toMillis(cpMarkEnd - cpMarkStart); - } - - /** */ - public void onLockRelease() { - long nanoTime = System.nanoTime(); - lockHoldDuration = TimeUnit.NANOSECONDS.toMillis(nanoTime - cpMarkStart); - lockDurationMillis = TimeUnit.NANOSECONDS.toMillis(nanoTime - cpLockWaitStart); - } - - /** */ - public void onPagesWriteStart() { - cpPagesWriteStart = System.nanoTime(); - } - - /** */ - public void onFsyncStart() { - cpFsyncStart = System.nanoTime(); - pagesWriteDuration = TimeUnit.NANOSECONDS.toMillis(cpFsyncStart - cpPagesWriteStart); - } - - /** */ - public void onEnd() { - cpEnd = System.nanoTime(); - totalDurationNanos = cpEnd - cpStart; - totalDuration = TimeUnit.NANOSECONDS.toMillis(totalDurationNanos); - fsyncDuration = TimeUnit.NANOSECONDS.toMillis(cpEnd - cpFsyncStart); - } - - /** */ - public void onListenersExecuteEnd() { - listenersExecDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - cpMarkStart); - } - - /** */ - public void onWalCpRecordFsyncStart() { - walCpRecordFsyncStart = System.nanoTime(); - } - - /** */ - public void onCpMarkerStoreEnd() { - writeCheckpointEntryDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - cpRecoveryDataWriteEnd); - } - - /** */ - public void onSplitAndSortCpPagesEnd() { - splitAndSortCpPagesDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - walCpRecordFsyncEnd); - } - - /** */ - public void onWalCpRecordFsyncEnd() { - walCpRecordFsyncEnd = System.nanoTime(); - walCpRecordFsyncDuration = TimeUnit.NANOSECONDS.toMillis(walCpRecordFsyncEnd - walCpRecordFsyncStart); - } - - /** */ - public void onWriteRecoveryDataEnd(long recoveryDataSize) { - cpRecoveryDataSize = recoveryDataSize; - cpRecoveryDataWriteEnd = System.nanoTime(); - recoveryDataWriteDuration = TimeUnit.NANOSECONDS.toMillis(cpRecoveryDataWriteEnd - cpMarkEnd); - } - - /** - * @return Total checkpoint duration. - */ - public long totalDuration() { - return totalDuration; - } - - /** - * @return Total checkpoint duration (nanos). - */ - public long totalDurationNanos() { - return totalDurationNanos; - } - - /** - * @return Checkpoint lock wait duration. - */ - public long lockWaitDuration() { - return lockWaitDuration; - } - - /** - * @return Checkpoint action before taken write lock duration. - */ - public long beforeLockDuration() { - return beforeLockDuration; - } - - /** - * @return Execution listeners under write lock duration. - */ - public long listenersExecuteDuration() { - return listenersExecDuration; - } - - /** - * @return Checkpoint mark duration. - */ - public long markDuration() { - return markDuration; - } - - /** - * @return Checkpoint lock hold duration. - */ - public long lockHoldDuration() { - return lockHoldDuration; - } - - /** - * @return Checkpoint lock duration - */ - public long lockDurationMillis() { - return lockDurationMillis; - } - - /** - * @return Pages write duration. - */ - public long pagesWriteDuration() { - return pagesWriteDuration; - } - - /** - * @return Checkpoint fsync duration. - */ - public long fsyncDuration() { - return fsyncDuration; - } - - /** - * @return Duration of WAL fsync after logging {@link CheckpointRecord} on checkpoint begin. - */ - public long walCpRecordFsyncDuration() { - return walCpRecordFsyncDuration; - } - - /** - * @return Duration of splitting and sorting checkpoint pages. - */ - public long splitAndSortCpPagesDuration() { - return splitAndSortCpPagesDuration; - } - - /** - * @return Duration of writing recovery data. - */ - public long recoveryDataWriteDuration() { - return recoveryDataWriteDuration; - } - - /** - * @return Size of writing recovery data. - */ - public long recoveryDataSize() { - return cpRecoveryDataSize; - } - - /** - * @return Duration of checkpoint entry buffer writing to file. - * - * @see CheckpointMarkersStorage#writeCheckpointEntry(long, UUID, WALPointer, CheckpointRecord, CheckpointEntryType, boolean) - */ - public long writeCheckpointEntryDuration() { - return writeCheckpointEntryDuration; - } - - /** - * @return Checkpoint start time. - */ - public long checkpointStartTime() { - return cpStartEpochTime; - } - - /** - * @return monotonic time in nanos at check point process start - */ - public long checkPointStartNanos() { - return cpStart; - } - - /** - * @return monotonic time in nanos at check point process end - */ - public long checkPointEndNanos() { - return cpEnd; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 4fb148cd4d2dc..5ea09f3ede906 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; import org.apache.ignite.internal.util.function.ThrowableSupplier; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 9913aabd5cfa0..bc2c5b7c29221 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index ea72d1b9c1864..d714c33c2d539 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgressImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; From 0885a59060215d2dd0ec1bf86334e13f8c05d7c8 Mon Sep 17 00:00:00 2001 From: Egor Baranov Date: Mon, 1 Jun 2026 17:10:53 +0300 Subject: [PATCH 3/4] IGNITE-27232 Removed JVM metrics collection due to module import is required + minor check style objections are solved --- .../ignite/internal/LongJVMPauseDetector.java | 63 ++++++++--------- .../checkpoint/CheckpointMetricsTracker.java | 68 ++++--------------- 2 files changed, 46 insertions(+), 85 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java index 9b2467ce86161..cc3388606e0f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMetricsTracker; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -96,6 +95,7 @@ public class LongJVMPauseDetector { @GridToStringInclude private final long[] longPausesTimestamps = new long[EVT_CNT]; + /** Long pauses monotonic timestamps. */ @GridToStringExclude private final long[] longPausesMonotonicTimestamps = new long[EVT_CNT]; @@ -127,38 +127,39 @@ public void start() { if (log.isDebugEnabled()) log.debug(Thread.currentThread().getName() + " has been started."); while (true) { - try { - // don't worry, wait will release monitor and all props will be accessible - synchronized (this) { - lastWakeUpTimeNanos = getMonotonicTimeNanos(); - long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; - long awaitDeadlineMillis = awaitDeadline / 1_000_000L; - int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000); - while (getMonotonicTimeNanos() <= awaitDeadline) - wait(awaitDeadlineMillis, awaitDeadlineNanos); - long nanoTime = getMonotonicTimeNanos(); - long pause = nanoTime - awaitDeadline; - long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause); - if (pauseMillis >= THRESHOLD) { - log.warning("Possible too long JVM pause: " + pauseMillis + " ms. " + - "Precision: " + PRECISION + " ms."); - final int next = (int) (longPausesCnt++ % EVT_CNT); - longPausesTotalDurationNanos += pause; - longPausesTimestamps[next] = System.currentTimeMillis(); - longPausesMonotonicTimestamps[next] = nanoTime; - longPausesDurations[next] = pauseMillis; - } + try { + // don't worry, wait will release monitor and all props will be accessible + synchronized (this) { + lastWakeUpTimeNanos = getMonotonicTimeNanos(); + long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; + long awaitDeadlineMillis = awaitDeadline / 1_000_000L; + int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000); + while (getMonotonicTimeNanos() <= awaitDeadline) + wait(awaitDeadlineMillis, awaitDeadlineNanos); + long nanoTime = getMonotonicTimeNanos(); + long pause = nanoTime - awaitDeadline; + long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause); + if (pauseMillis >= THRESHOLD) { + log.warning("Possible too long JVM pause: " + pauseMillis + " ms. " + + "Precision: " + PRECISION + " ms."); + final int next = (int)(longPausesCnt++ % EVT_CNT); + longPausesTotalDurationNanos += pause; + longPausesTimestamps[next] = System.currentTimeMillis(); + longPausesMonotonicTimestamps[next] = nanoTime; + longPausesDurations[next] = pauseMillis; } - } catch (InterruptedException e) { - Thread locThread = Thread.currentThread(); + } + } + catch (InterruptedException e) { + Thread locThread = Thread.currentThread(); - if (workerRef.compareAndSet(locThread, null)) - log.error(locThread.getName() + " has been interrupted.", e); - else if (log.isDebugEnabled()) - log.debug(locThread.getName() + " has been stopped."); + if (workerRef.compareAndSet(locThread, null)) + log.error(locThread.getName() + " has been interrupted.", e); + else if (log.isDebugEnabled()) + log.debug(locThread.getName() + " has been stopped."); - break; - } + break; + } } }); @@ -225,7 +226,7 @@ synchronized Map longPauseEvents() { * or {@link Optional#empty()} if none were found */ public Optional getTotalSpottedPausesExplain(long cpStart) { - int lastPointer = (int) (longPausesCnt % EVT_CNT); + int lastPointer = (int)(longPausesCnt % EVT_CNT); int pausesSpottedTimes = 0; int curPointer = lastPointer; StringBuilder explainBuilder = new StringBuilder(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java index 9c2036a157ea3..989052e77f2f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java @@ -17,12 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence.checkpoint; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.util.Collection; import java.util.EnumMap; -import java.util.LinkedList; -import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -31,9 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Function; -import java.util.stream.Collectors; -import com.sun.management.GcInfo; -import com.sun.management.internal.GarbageCollectorExtImpl; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.LongJVMPauseDetector; import org.apache.ignite.internal.pagemem.store.PageStore; @@ -66,9 +58,6 @@ * */ public class CheckpointMetricsTracker { - /** Jvm pause explain pattern. */ - private static final String JVM_PAUSE_EXPLAIN_PATTERN = "Possible JVM Pause explaination: [ %s ]"; - /** Checkpoint started log message format. */ private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" + "checkpointId=%s, " + @@ -91,8 +80,10 @@ public class CheckpointMetricsTracker { Thread thread = new Thread(runnable); try { thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY - 1); thread.setName("check-point-metrics-export-thread"); - } catch (SecurityException ignored) { + } + catch (SecurityException ignored) { // do nothing } return thread; @@ -104,7 +95,8 @@ public class CheckpointMetricsTracker { CHECK_POINT_METRICS_EXPORT_EXECUTOR.shutdown(); try { CHECK_POINT_METRICS_EXPORT_EXECUTOR.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); @@ -119,7 +111,6 @@ public class CheckpointMetricsTracker { private static final AtomicIntegerFieldUpdater COW_PAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages"); - /** Long JVM pause threshold. */ private final int longJvmPauseThreshold = getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD); @@ -444,7 +435,7 @@ void updatePerformanceStatistics(Checkpoint chp, PerformanceStatisticsProcessor */ void storeMetrics(Checkpoint chp, DataStorageMetricsImpl persStoreMetrics, GridCacheProcessor cacheProc) { boolean metricsEnabled = persStoreMetrics.metricsEnabled(); - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager) cacheProc.context().database(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cacheProc.context().database(); int pagesSize = chp.pagesSize; long pageStoresSize = dbMgr.forAllPageStores(PageStore::size); long pageStoresSparseSize = dbMgr.forAllPageStores(PageStore::getSparseSize); @@ -502,54 +493,22 @@ else if (endIdx == startIdx) return res; } - /** * @return Explain possible JVM pause. */ private String possibleLongJvmPauseExplaination() { long lockDuration = getDurationMillis(Duration.TOTAL_LOCK); if (LongJVMPauseDetector.enabled() && lockDuration > longJvmPauseThreshold) { - List explains = new LinkedList<>(); - explains.add(String.format("Checkpoint lock took %d ms", lockDuration)); - explains.addAll(getGCExplains()); - pauseDetector.getTotalSpottedPausesExplain(cpStart).ifPresent(explains::add); - return String.format(JVM_PAUSE_EXPLAIN_PATTERN, String.join("; ", explains)) + ", "; + StringBuilder explainBuilder = new StringBuilder("Checkpoint lock took ") + .append(lockDuration).append(" ms, "); + Optional totalSpottedPausesExplain = pauseDetector.getTotalSpottedPausesExplain(cpStart); + totalSpottedPausesExplain.ifPresent(explainBuilder::append); + totalSpottedPausesExplain.ifPresent(ignored -> explainBuilder.append(", ")); + return explainBuilder.toString(); } return ""; } - /** - * @return Collection of every last GC if it happened within checkpoint process bounds - */ - private Collection getGCExplains() { - return ManagementFactory.getGarbageCollectorMXBeans() - .stream() - .filter(GarbageCollectorExtImpl.class::isInstance) - .map(GarbageCollectorExtImpl.class::cast) - .map(this::getGCExplain) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - } - - /** - * @param mxBean GC Mx bean. - * @return GC explain if it happend within check point process bounds - */ - private Optional getGCExplain(GarbageCollectorExtImpl mxBean) { - String gcName = mxBean.getName(); - GcInfo lastGcInfo = mxBean.getLastGcInfo(); - RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); - long uptimeNanos = runtimeMXBean.getUptime() * 1_000_000L; - long monotonicStartTimeNanos = System.nanoTime() - uptimeNanos; - long monotonicGCStartTimeNanos = monotonicStartTimeNanos + lastGcInfo.getStartTime() * 1_000_000L; - long monotonicGCEndTimeNanos = monotonicStartTimeNanos + lastGcInfo.getEndTime() * 1_000_000L; - if (cpStart >= monotonicGCEndTimeNanos || cpEnd <= monotonicGCStartTimeNanos) - return Optional.empty(); - long duration = lastGcInfo.getDuration(); - return Optional.of(String.format("%s most recent GC took %d ms", gcName, duration)); - } - /** * Tries to execute task, will not throw {@link java.util.concurrent.RejectedExecutionException} * @param runnable Runnable. @@ -557,7 +516,8 @@ private Optional getGCExplain(GarbageCollectorExtImpl mxBean) { private void invokeLater(Runnable runnable) { try { CHECK_POINT_METRICS_EXPORT_EXECUTOR.submit(runnable); - } catch (RejectedExecutionException ignored) { + } + catch (RejectedExecutionException ignored) { // do nothing, looks like it is the end } } From 921f13095cce3fc0ca7106e2c233eaa638c896d1 Mon Sep 17 00:00:00 2001 From: Egor Baranov Date: Tue, 2 Jun 2026 16:44:39 +0300 Subject: [PATCH 4/4] IGNITE-27232 Added test case and extended one. Minor bug fixes spotted during tests. --- .../GridCommandHandlerCheckpointTest.java | 22 +++++-- .../ignite/internal/LongJVMPauseDetector.java | 24 +++---- .../checkpoint/CheckpointMetricsTracker.java | 4 +- .../internal/LongJVMPauseDetectorTest.java | 63 +++++++++++++++++++ 4 files changed, 96 insertions(+), 17 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java index ecf9b907aa6ef..ab6cc39f41338 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerCheckpointTest.java @@ -108,6 +108,18 @@ else if (!persistenceEnable()) { /** Test checkpoint command with persistence enabled. */ @Test public void testCheckpointPersistenceCluster() throws Exception { + LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build(); + LogListener startMetricsLsnr = LogListener.matches(Pattern.compile( + "^.*Checkpoint started .*checkpointBeforeLockTime=\\d+ms.*$")).build(); + LogListener skipMetricsCheckpointLsnr = LogListener.matches(Pattern.compile( + "^.*Skipping checkpoint .*checkpointListenersExecuteTime=\\d+ms.*$")).build(); + LogListener finishMetricsCheckPointLsnr = LogListener.matches(Pattern.compile( + "^.*Checkpoint finished .*total=\\d+ms.*$")).build(); + listeningLog.registerListener(checkpointReasonLsnr); + listeningLog.registerListener(startMetricsLsnr); + listeningLog.registerListener(skipMetricsCheckpointLsnr); + listeningLog.registerListener(finishMetricsCheckPointLsnr); + persistenceEnable(true); IgniteEx srv = startGrids(2); @@ -133,12 +145,10 @@ public void testCheckpointPersistenceCluster() throws Exception { assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--reason", "test_reason")); - LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build(); - - listeningLog.registerListener(checkpointReasonLsnr); - assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000)); assertTrue(GridTestUtils.waitForCondition(checkpointReasonLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000)); assertFalse(testOut.toString().contains("persistence disabled")); outputContains(": Checkpoint started"); @@ -146,11 +156,15 @@ public void testCheckpointPersistenceCluster() throws Exception { testOut.reset(); checkpointFinishedLsnr.reset(); + startMetricsLsnr.reset(); + finishMetricsCheckPointLsnr.reset(); cacheCli.put(3, 3); assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--wait-for-finish")); assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000)); + assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000)); assertFalse(testOut.toString().contains("persistence disabled")); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java index cc3388606e0f7..49ef15acd26ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/LongJVMPauseDetector.java @@ -128,21 +128,23 @@ public void start() { log.debug(Thread.currentThread().getName() + " has been started."); while (true) { try { - // don't worry, wait will release monitor and all props will be accessible synchronized (this) { lastWakeUpTimeNanos = getMonotonicTimeNanos(); long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS; - long awaitDeadlineMillis = awaitDeadline / 1_000_000L; - int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000); - while (getMonotonicTimeNanos() <= awaitDeadline) - wait(awaitDeadlineMillis, awaitDeadlineNanos); + while (getMonotonicTimeNanos() < awaitDeadline) { + long monotonicTimeNanos = getMonotonicTimeNanos(); + long waitNanos = awaitDeadline - monotonicTimeNanos; + long waitMillis = Math.max(0, (long)Math.ceil(waitNanos / 1_000_000d)); + awaitDeadline = monotonicTimeNanos + (waitMillis * 1_000_000); + wait(waitMillis); + } long nanoTime = getMonotonicTimeNanos(); long pause = nanoTime - awaitDeadline; - long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause); + long pauseMillis = pause / 1_000_000; if (pauseMillis >= THRESHOLD) { - log.warning("Possible too long JVM pause: " + pauseMillis + " ms. " + - "Precision: " + PRECISION + " ms."); - final int next = (int)(longPausesCnt++ % EVT_CNT); + log.warning("Possible too long JVM pause: " + + "between " + pauseMillis + " and " + (pauseMillis + PRECISION) + " ms. "); + final int next = (int) (longPausesCnt++ % EVT_CNT); longPausesTotalDurationNanos += pause; longPausesTimestamps[next] = System.currentTimeMillis(); longPausesMonotonicTimestamps[next] = nanoTime; @@ -226,7 +228,7 @@ synchronized Map longPauseEvents() { * or {@link Optional#empty()} if none were found */ public Optional getTotalSpottedPausesExplain(long cpStart) { - int lastPointer = (int)(longPausesCnt % EVT_CNT); + int lastPointer = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT); int pausesSpottedTimes = 0; int curPointer = lastPointer; StringBuilder explainBuilder = new StringBuilder(); @@ -256,7 +258,7 @@ public Optional getTotalSpottedPausesExplain(long cpStart) { /** * @return monotonic time in nanos */ - protected static long getMonotonicTimeNanos() { + protected long getMonotonicTimeNanos() { return System.nanoTime(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java index 989052e77f2f5..2b75c3c6925d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMetricsTracker.java @@ -499,8 +499,8 @@ else if (endIdx == startIdx) private String possibleLongJvmPauseExplaination() { long lockDuration = getDurationMillis(Duration.TOTAL_LOCK); if (LongJVMPauseDetector.enabled() && lockDuration > longJvmPauseThreshold) { - StringBuilder explainBuilder = new StringBuilder("Checkpoint lock took ") - .append(lockDuration).append(" ms, "); + StringBuilder explainBuilder = new StringBuilder("Long JVM pause is spotted! Checkpoint lock took ") + .append(lockDuration).append("ms, "); Optional totalSpottedPausesExplain = pauseDetector.getTotalSpottedPausesExplain(cpStart); totalSpottedPausesExplain.ifPresent(explainBuilder::append); totalSpottedPausesExplain.ifPresent(ignored -> explainBuilder.append(", ")); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java index 45b436c8dabff..043f10fbb6a95 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/LongJVMPauseDetectorTest.java @@ -17,11 +17,16 @@ package org.apache.ignite.internal; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; import org.junit.Test; /** @@ -85,4 +90,62 @@ public void testStopWorkerThread() throws Exception { assertFalse(interruptLsnr.check()); assertTrue(stopLsnr.check()); } + + /** + * This test will create, check logic, + * check report gethering and gracefull shutdown + */ + @Test + public void testFullCycle() throws InterruptedException { + long[] monotonicTimes = new long[] { + 0, // init field + 100, // first renew of last wake-up time + 100, // check we didn't wake up spuriously + 100, // we need to get real wait time, so now time is + 50_000_100, // so we waited 50 ms, and previous time is 100 ns, so it's that time now + 550_000_100}; // something happend! Waited for 500 ms (equals to 500ms + previous nanotime)! + CountDownLatch cntDownLatch = new CountDownLatch(7); + LongJVMPauseDetector longJVMPauseDetector = getLongJVMPauseDetector(monotonicTimes, cntDownLatch); + longJVMPauseDetector.start(); + assertTrue(cntDownLatch.await(10, TimeUnit.SECONDS)); + assertEquals(500, longJVMPauseDetector.longPausesTotalDuration()); + assertEquals(1, longJVMPauseDetector.longPausesCount()); + Map longPauseEvts = longJVMPauseDetector.longPauseEvents(); + assertTrue(longPauseEvts.containsValue(500L)); + Optional spottedPausesExplain = longJVMPauseDetector.getTotalSpottedPausesExplain(100); + assertTrue(spottedPausesExplain.isPresent()); + assertTrue(spottedPausesExplain.get().matches("Pause detecor spotted 1 pauses: \\[500 ms at \\d+;]. Each with precision 50 ms")); + longJVMPauseDetector.stop(); + } + + /** + * @param monotonicTimes Monotonic times. Answers for method + * {@link org.apache.ignite.internal.LongJVMPauseDetector#getMonotonicTimeNanos()} + * @param cntDownLatch Count down latch. + */ + private @NotNull LongJVMPauseDetector getLongJVMPauseDetector(long[] monotonicTimes, CountDownLatch cntDownLatch) { + return new LongJVMPauseDetector("test-instance", listeningTestLogger) { + private int visitCounter; + /** {@inheritDoc} */ + @Override protected long getMonotonicTimeNanos() { + synchronized (this) { + visitCounter++; + if (visitCounter - 1 >= monotonicTimes.length) { + cntDownLatch.countDown(); + while (!Thread.currentThread().isInterrupted()) { + try { + wait(Long.MAX_VALUE); + } + catch (InterruptedException ignored) { + // do nothing + } + } + return 0; + } + cntDownLatch.countDown(); + return monotonicTimes[visitCounter - 1]; + } + } + }; + } }