Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ else if (!persistenceEnable()) {
/** Test checkpoint command with persistence enabled. */
@Test
public void testCheckpointPersistenceCluster() throws Exception {
LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build();
LogListener startMetricsLsnr = LogListener.matches(Pattern.compile(
"^.*Checkpoint started .*checkpointBeforeLockTime=\\d+ms.*$")).build();
LogListener skipMetricsCheckpointLsnr = LogListener.matches(Pattern.compile(
"^.*Skipping checkpoint .*checkpointListenersExecuteTime=\\d+ms.*$")).build();
LogListener finishMetricsCheckPointLsnr = LogListener.matches(Pattern.compile(
"^.*Checkpoint finished .*total=\\d+ms.*$")).build();
listeningLog.registerListener(checkpointReasonLsnr);
listeningLog.registerListener(startMetricsLsnr);
listeningLog.registerListener(skipMetricsCheckpointLsnr);
listeningLog.registerListener(finishMetricsCheckPointLsnr);

persistenceEnable(true);

IgniteEx srv = startGrids(2);
Expand All @@ -133,24 +145,26 @@ public void testCheckpointPersistenceCluster() throws Exception {

assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--reason", "test_reason"));

LogListener checkpointReasonLsnr = LogListener.matches("reason='test_reason'").build();

listeningLog.registerListener(checkpointReasonLsnr);

assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000));
assertTrue(GridTestUtils.waitForCondition(checkpointReasonLsnr::check, 10_000));
assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000));
assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000));
assertFalse(testOut.toString().contains("persistence disabled"));

outputContains(": Checkpoint started");

testOut.reset();

checkpointFinishedLsnr.reset();
startMetricsLsnr.reset();
finishMetricsCheckPointLsnr.reset();

cacheCli.put(3, 3);

assertEquals(EXIT_CODE_OK, execute("--checkpoint", "--wait-for-finish"));
assertTrue(GridTestUtils.waitForCondition(checkpointFinishedLsnr::check, 10_000));
assertTrue(GridTestUtils.waitForCondition(startMetricsLsnr::check, 10_000));
assertTrue(GridTestUtils.waitForCondition(finishMetricsCheckPointLsnr::check, 10_000));
assertFalse(testOut.toString().contains("persistence disabled"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
package org.apache.ignite.internal;

import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_DISABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
Expand Down Expand Up @@ -56,9 +56,12 @@ public class LongJVMPauseDetector {
public static final int DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT = 20;

/** Precision. */
private static final int PRECISION =
private static final long PRECISION =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_PRECISION, DFLT_JVM_PAUSE_DETECTOR_PRECISION);

/** Precision. */
private static final long PRECISION_NANOS = PRECISION * 1_000_000L;

/** Threshold. */
private static final int THRESHOLD =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
Expand All @@ -83,15 +86,19 @@ public class LongJVMPauseDetector {
private long longPausesCnt;

/** Long pause total duration. */
private long longPausesTotalDuration;
private long longPausesTotalDurationNanos;

/** Last detector's wake up time. */
private long lastWakeUpTime;
private long lastWakeUpTimeNanos = getMonotonicTimeNanos();

/** Long pauses timestamps. */
@GridToStringInclude
private final long[] longPausesTimestamps = new long[EVT_CNT];

/** Long pauses monotonic timestamps. */
@GridToStringExclude
private final long[] longPausesMonotonicTimestamps = new long[EVT_CNT];

/** Long pauses durations. */
@GridToStringInclude
private final long[] longPausesDurations = new long[EVT_CNT];
Expand All @@ -117,40 +124,31 @@ public void start() {
}

final Thread worker = new IgniteThread(igniteInstanceName, "jvm-pause-detector-worker", () -> {
synchronized (this) {
lastWakeUpTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}

if (log.isDebugEnabled())
log.debug(Thread.currentThread().getName() + " has been started.");

while (true) {
try {
Thread.sleep(PRECISION);

final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
final long pause = now - PRECISION - lastWakeUpTime;

if (pause >= THRESHOLD) {
log.warning("Possible too long JVM pause: " + pause + " milliseconds.");

synchronized (this) {
final int next = (int)(longPausesCnt % EVT_CNT);

longPausesCnt++;

longPausesTotalDuration += pause;

longPausesTimestamps[next] = now;

longPausesDurations[next] = pause;

lastWakeUpTime = now;
synchronized (this) {
lastWakeUpTimeNanos = getMonotonicTimeNanos();
long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS;
while (getMonotonicTimeNanos() < awaitDeadline) {
long monotonicTimeNanos = getMonotonicTimeNanos();
long waitNanos = awaitDeadline - monotonicTimeNanos;
long waitMillis = Math.max(0, (long)Math.ceil(waitNanos / 1_000_000d));
awaitDeadline = monotonicTimeNanos + (waitMillis * 1_000_000);
wait(waitMillis);
}
}
else {
synchronized (this) {
lastWakeUpTime = now;
long nanoTime = getMonotonicTimeNanos();
long pause = nanoTime - awaitDeadline;
long pauseMillis = pause / 1_000_000;
if (pauseMillis >= THRESHOLD) {
log.warning("Possible too long JVM pause: " +
"between " + pauseMillis + " and " + (pauseMillis + PRECISION) + " ms. ");
final int next = (int) (longPausesCnt++ % EVT_CNT);
longPausesTotalDurationNanos += pause;
longPausesTimestamps[next] = System.currentTimeMillis();
longPausesMonotonicTimestamps[next] = nanoTime;
longPausesDurations[next] = pauseMillis;
}
}
}
Expand Down Expand Up @@ -209,14 +207,7 @@ synchronized long longPausesCount() {
* @return Long JVM pauses total duration.
*/
synchronized long longPausesTotalDuration() {
return longPausesTotalDuration;
}

/**
* @return Last checker's wake up time.
*/
public synchronized long getLastWakeUpTime() {
return lastWakeUpTime;
return TimeUnit.NANOSECONDS.toMillis(longPausesTotalDurationNanos);
}

/**
Expand All @@ -232,20 +223,42 @@ synchronized Map<Long, Long> longPauseEvents() {
}

/**
* @return Pair ({@code last long pause event time}, {@code pause time duration}) or {@code null}, if long pause
* wasn't occurred.
* @param cpStart Check point start time in nanos.
* @return Tries to explain total pauses spotted during check point process
* or {@link Optional#empty()} if none were found
*/
public synchronized @Nullable IgniteBiTuple<Long, Long> getLastLongPause() {
int lastPauseIdx = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT);

if (longPausesTimestamps[lastPauseIdx] == 0)
return null;

return new IgniteBiTuple<>(longPausesTimestamps[lastPauseIdx], longPausesDurations[lastPauseIdx]);
public Optional<String> getTotalSpottedPausesExplain(long cpStart) {
int lastPointer = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT);
int pausesSpottedTimes = 0;
int curPointer = lastPointer;
StringBuilder explainBuilder = new StringBuilder();
synchronized (this) {
do {
if (longPausesMonotonicTimestamps[curPointer] <= cpStart)
break;
explainBuilder.append(longPausesDurations[curPointer]).append(" ms at ")
.append(longPausesTimestamps[curPointer]).append(";");
pausesSpottedTimes++;
curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1;
} while (curPointer != lastPointer);
}
return pausesSpottedTimes == 0 ?
Optional.empty() :
Optional.of(String.format("Pause detecor spotted %d pauses: [%s]. Each with precision %d ms",
pausesSpottedTimes,
explainBuilder,
PRECISION));
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LongJVMPauseDetector.class, this);
}

/**
* @return monotonic time in nanos
*/
protected long getMonotonicTimeNanos() {
return System.nanoTime();
}
}
Loading