Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ public boolean isReadable() {
}
}

@VisibleForTesting
static class ReadEntriesRequest {
private final InFlightTask inFlightTask;
private final long bytesToRead;

ReadEntriesRequest(InFlightTask inFlightTask, long bytesToRead) {
this.inFlightTask = inFlightTask;
this.bytesToRead = bytesToRead;
}
}

/**
* Calculate available permits for read entries.
*/
Expand Down Expand Up @@ -287,9 +298,8 @@ protected void readMoreEntries() {
if (state.equals(Terminated) || state.equals(Terminating)) {
return;
}
// Acquire permits and check state of producer.
InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
if (newInFlightTask == null) {
ReadEntriesRequest request = acquireReadEntriesRequestIfNeeded();
if (request == null) {
// no permits from rate limit
log.debug("Not scheduling read due to pending read or no permits");
if (!hasPendingRead()) {
Expand All @@ -300,44 +310,60 @@ protected void readMoreEntries() {
return;
}
}
// If disabled RateLimiter.
if (!dispatchRateLimiter.isPresent() || !dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, -1, this,
newInFlightTask/* Context object */, topic.getMaxReadPosition());
return;
}
// No permits of RateLimiter.
AvailablePermits availablePermits = getRateLimiterAvailablePermits(newInFlightTask.readingEntries);
if (!availablePermits.isReadable()) {
// no rate limiter permits from rate limit
log.debug()
.attr("messages", availablePermits.getMessages())
.attr("bytes", availablePermits.getBytes())
.log("Throttling replication traffic");
topic.getBrokerService().executor().schedule(
() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return;
}
// Has permits of RateLimiter.
int messagesToRead = availablePermits.getMessages();
long bytesToRead = availablePermits.getBytes();
if (!isWritable()) {
log.debug("Throttling replication traffic because producer is not writable");
// Minimize the read size if the producer is disconnected or the window is already full
messagesToRead = 1;
}
// Update acquired permits exceeds limitation.
if (messagesToRead < newInFlightTask.readingEntries) {
newInFlightTask.setReadingEntries(messagesToRead);
}
InFlightTask newInFlightTask = request.inFlightTask;
log.debug()
.attr("readingEntries", newInFlightTask.readingEntries)
.attr("bytesToRead", bytesToRead)
.attr("bytesToRead", request.bytesToRead)
.log("Scheduling read");
cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, bytesToRead, this,
cursor.asyncReadEntriesOrWait(newInFlightTask.readingEntries, request.bytesToRead, this,
newInFlightTask/* Context object */, topic.getMaxReadPosition());
}

@VisibleForTesting
ReadEntriesRequest acquireReadEntriesRequestIfNeeded() {
synchronized (inFlightTasks) {
if (hasPendingRead()) {
log.info("Skip the reading because there is a pending read task");
return null;
}
if (waitForCursorRewindingRefCnf > 0) {
log.info("Skip the reading due to new detected schema");
return null;
}
if (state != Started) {
log.info("Skip the reading because producer has not started");
return null;
}
int permits = getPermitsIfNoPendingRead();
if (permits == 0) {
return null;
}

int messagesToRead = permits;
long bytesToRead = -1;
if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
AvailablePermits availablePermits = getRateLimiterAvailablePermits(permits);
if (!availablePermits.isReadable()) {
// no rate limiter permits from rate limit
log.debug()
.attr("messages", availablePermits.getMessages())
.attr("bytes", availablePermits.getBytes())
.log("Throttling replication traffic");
return null;
}
messagesToRead = availablePermits.getMessages();
bytesToRead = availablePermits.getBytes();
if (!isWritable()) {
log.debug("Throttling replication traffic because producer is not writable");
// Minimize the read size if the producer is disconnected or the window is already full
messagesToRead = 1;
}
Comment on lines +356 to +360

@lhotari lhotari Jun 15, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes the behavior of the existing code when it's moved here.

There's also a bug in the existing code. This check should be done before acquiring permits. It doesn't make sense that permits and rate limit is calculated based on a different amount than what is used.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional problem is that there's not a way to specify the Netty watermark limits which are used to determine the "writability" of a Netty channel. By default it's a very small value and it could be useful to allow more buffering than the Netty defaults allow.

}
return new ReadEntriesRequest(
createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), messagesToRead), bytesToRead);
}
}

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
log.debug()
Expand Down Expand Up @@ -871,26 +897,8 @@ InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int readingE
}

protected InFlightTask acquirePermitsIfNotFetchingSchema() {
synchronized (inFlightTasks) {
if (hasPendingRead()) {
log.info("Skip the reading because there is a pending read task");
return null;
}
if (waitForCursorRewindingRefCnf > 0) {
log.info("Skip the reading due to new detected schema");
return null;
}
if (state != Started) {
log.info("Skip the reading because producer has not started");
return null;
}
// Guarantee that there is a unique cursor reading task.
int permits = getPermitsIfNoPendingRead();
if (permits == 0) {
return null;
}
return createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits);
}
ReadEntriesRequest request = acquireReadEntriesRequestIfNeeded();
return request == null ? null : request.inFlightTask;
}

protected int getPermitsIfNoPendingRead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -167,6 +169,39 @@ public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated(
}
}

@Test
public void testRateLimiterWithoutPermitsDoesNotCreateInFlightTask() throws Exception {
assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(0, -1);
assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(-1, 0);
}

private void assertRateLimiterWithoutPermitsDoesNotCreateInFlightTask(long availableMessages,
long availableBytes) throws Exception {
PersistentReplicator replicator = getReplicator(topicName);

LinkedList<InFlightTask> inFlightTasks = replicator.inFlightTasks;
List<InFlightTask> originalTasks = new ArrayList<>(inFlightTasks);
Optional<DispatchRateLimiter> originalRateLimiter = replicator.dispatchRateLimiter;
inFlightTasks.clear();

DispatchRateLimiter rateLimiter = mock(DispatchRateLimiter.class);
when(rateLimiter.isDispatchRateLimitingEnabled()).thenReturn(true);
when(rateLimiter.getAvailableDispatchRateLimitOnMsg()).thenReturn(availableMessages);
when(rateLimiter.getAvailableDispatchRateLimitOnByte()).thenReturn(availableBytes);
replicator.dispatchRateLimiter = Optional.of(rateLimiter);

try {
Assert.assertNull(replicator.acquireReadEntriesRequestIfNeeded());
Assert.assertTrue(inFlightTasks.isEmpty());
Assert.assertFalse(replicator.hasPendingRead());
assertEquals(replicator.getPermitsIfNoPendingRead(), 1000);
} finally {
inFlightTasks.clear();
inFlightTasks.addAll(originalTasks);
replicator.dispatchRateLimiter = originalRateLimiter;
}
}

@Test
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");
Expand Down