diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8433d9b9c7ba2..5b1773b60829d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -240,6 +240,17 @@ public boolean isReadable() { } } + @VisibleForTesting + static class ReadEntriesRequest { + private final InFlightTask inFlightTask; + private final long bytesToRead; + + ReadEntriesRequest(InFlightTask inFlightTask, long bytesToRead) { + this.inFlightTask = inFlightTask; + this.bytesToRead = bytesToRead; + } + } + /** * Calculate available permits for read entries. */ @@ -287,9 +298,8 @@ protected void readMoreEntries() { if (state.equals(Terminated) || state.equals(Terminating)) { return; } - // Acquire permits and check state of producer. - InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema(); - if (newInFlightTask == null) { + ReadEntriesRequest request = acquireReadEntriesRequestIfNeeded(); + if (request == null) { // no permits from rate limit log.debug("Not scheduling read due to pending read or no permits"); if (!hasPendingRead()) { @@ -300,44 +310,60 @@ protected void readMoreEntries() { return; } } - // If disabled RateLimiter. - if (!dispatchRateLimiter.isPresent() || !dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { - cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, -1, this, - newInFlightTask/* Context object */, topic.getMaxReadPosition()); - return; - } - // No permits of RateLimiter. - AvailablePermits availablePermits = getRateLimiterAvailablePermits(newInFlightTask.readingEntries); - if (!availablePermits.isReadable()) { - // no rate limiter permits from rate limit - log.debug() - .attr("messages", availablePermits.getMessages()) - .attr("bytes", availablePermits.getBytes()) - .log("Throttling replication traffic"); - topic.getBrokerService().executor().schedule( - () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); - return; - } - // Has permits of RateLimiter. - int messagesToRead = availablePermits.getMessages(); - long bytesToRead = availablePermits.getBytes(); - if (!isWritable()) { - log.debug("Throttling replication traffic because producer is not writable"); - // Minimize the read size if the producer is disconnected or the window is already full - messagesToRead = 1; - } - // Update acquired permits exceeds limitation. - if (messagesToRead < newInFlightTask.readingEntries) { - newInFlightTask.setReadingEntries(messagesToRead); - } + InFlightTask newInFlightTask = request.inFlightTask; log.debug() .attr("readingEntries", newInFlightTask.readingEntries) - .attr("bytesToRead", bytesToRead) + .attr("bytesToRead", request.bytesToRead) .log("Scheduling read"); - cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, bytesToRead, this, + cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, request.bytesToRead, this, newInFlightTask/* Context object */, topic.getMaxReadPosition()); } + @VisibleForTesting + ReadEntriesRequest acquireReadEntriesRequestIfNeeded() { + synchronized (inFlightTasks) { + if (hasPendingRead()) { + log.info("Skip the reading because there is a pending read task"); + return null; + } + if (waitForCursorRewindingRefCnf > 0) { + log.info("Skip the reading due to new detected schema"); + return null; + } + if (state != Started) { + log.info("Skip the reading because producer has not started"); + return null; + } + int permits = getPermitsIfNoPendingRead(); + if (permits == 0) { + return null; + } + + int messagesToRead = permits; + long bytesToRead = -1; + if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { + AvailablePermits availablePermits = getRateLimiterAvailablePermits(permits); + if (!availablePermits.isReadable()) { + // no rate limiter permits from rate limit + log.debug() + .attr("messages", availablePermits.getMessages()) + .attr("bytes", availablePermits.getBytes()) + .log("Throttling replication traffic"); + return null; + } + messagesToRead = availablePermits.getMessages(); + bytesToRead = availablePermits.getBytes(); + if (!isWritable()) { + log.debug("Throttling replication traffic because producer is not writable"); + // Minimize the read size if the producer is disconnected or the window is already full + messagesToRead = 1; + } + } + return new ReadEntriesRequest( + createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), messagesToRead), bytesToRead); + } + } + @Override public void readEntriesComplete(List entries, Object ctx) { log.debug() @@ -871,26 +897,8 @@ InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int readingE } protected InFlightTask acquirePermitsIfNotFetchingSchema() { - synchronized (inFlightTasks) { - if (hasPendingRead()) { - log.info("Skip the reading because there is a pending read task"); - return null; - } - if (waitForCursorRewindingRefCnf > 0) { - log.info("Skip the reading due to new detected schema"); - return null; - } - if (state != Started) { - log.info("Skip the reading because producer has not started"); - return null; - } - // Guarantee that there is a unique cursor reading task. - int permits = getPermitsIfNoPendingRead(); - if (permits == 0) { - return null; - } - return createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits); - } + ReadEntriesRequest request = acquireReadEntriesRequestIfNeeded(); + return request == null ? null : request.inFlightTask; } protected int getPermitsIfNoPendingRead() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 478437ffde015..7c964fcd76cde 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import java.util.ArrayList; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -167,6 +169,39 @@ public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated( } } + @Test + public void testRateLimiterWithoutPermitsDoesNotCreateInFlightTask() throws Exception { + assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(0, -1); + assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(-1, 0); + } + + private void assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(long availableMessages, + long availableBytes) throws Exception { + PersistentReplicator replicator = getReplicator(topicName); + + LinkedList inFlightTasks = replicator.inFlightTasks; + List originalTasks = new ArrayList<>(inFlightTasks); + Optional originalRateLimiter = replicator.dispatchRateLimiter; + inFlightTasks.clear(); + + DispatchRateLimiter rateLimiter = mock(DispatchRateLimiter.class); + when(rateLimiter.isDispatchRateLimitingEnabled()).thenReturn(true); + when(rateLimiter.getAvailableDispatchRateLimitOnMsg()).thenReturn(availableMessages); + when(rateLimiter.getAvailableDispatchRateLimitOnByte()).thenReturn(availableBytes); + replicator.dispatchRateLimiter = Optional.of(rateLimiter); + + try { + Assert.assertNull(replicator.acquireReadEntriesRequestIfNeeded()); + Assert.assertTrue(inFlightTasks.isEmpty()); + Assert.assertFalse(replicator.hasPendingRead()); + assertEquals(replicator.getPermitsIfNoPendingRead(), 1000); + } finally { + inFlightTasks.clear(); + inFlightTasks.addAll(originalTasks); + replicator.dispatchRateLimiter = originalRateLimiter; + } + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");