diff --git a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java index d4ff33a37..f5d85aa10 100644 --- a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java +++ b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java @@ -18,7 +18,7 @@ class Buffer { private static final long bufferActiveBit = 1L << 63; - private final AtomicLong observationCount = new AtomicLong(0); + private final AtomicLong[] stripedObservationCounts; private double[] observationBuffer = new double[0]; private int bufferPos = 0; private boolean reset = false; @@ -27,8 +27,18 @@ class Buffer { ReentrantLock runLock = new ReentrantLock(); Condition bufferFilled = appendLock.newCondition(); + Buffer() { + stripedObservationCounts = new AtomicLong[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < stripedObservationCounts.length; i++) { + stripedObservationCounts[i] = new AtomicLong(0); + } + } + boolean append(double value) { - long count = observationCount.incrementAndGet(); + AtomicLong observationCountForThread = + stripedObservationCounts[ + ((int) Thread.currentThread().getId()) % stripedObservationCounts.length]; + long count = observationCountForThread.incrementAndGet(); if ((count & bufferActiveBit) == 0) { return false; // sign bit not set -> buffer not active. } else { @@ -69,7 +79,10 @@ T run( runLock.lock(); try { // Signal that the buffer is active. - Long expectedCount = observationCount.getAndAdd(bufferActiveBit); + Long expectedCount = 0L; + for (AtomicLong observationCount : stripedObservationCounts) { + expectedCount += observationCount.getAndAdd(bufferActiveBit); + } while (!complete.apply(expectedCount)) { // Wait until all in-flight threads have added their observations to the histogram / @@ -81,14 +94,18 @@ T run( result = createResult.get(); // Signal that the buffer is inactive. - int expectedBufferSize; + long expectedBufferSize = 0; if (reset) { - expectedBufferSize = - (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount); + for (AtomicLong observationCount : stripedObservationCounts) { + expectedBufferSize += (int) (observationCount.getAndSet(0) & ~bufferActiveBit); + } reset = false; } else { - expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount); + for (AtomicLong observationCount : stripedObservationCounts) { + expectedBufferSize += (int) observationCount.addAndGet(bufferActiveBit); + } } + expectedBufferSize -= expectedCount; appendLock.lock(); try {