Skip to content

[fix][broker] Prevent replicator from getting stuck when dispatch rate limiter has no permits#26005

Open
void-ptr974 wants to merge 1 commit into
apache:masterfrom
void-ptr974:fix/replicator-rate-limiter-inflight
Open

[fix][broker] Prevent replicator from getting stuck when dispatch rate limiter has no permits#26005
void-ptr974 wants to merge 1 commit into
apache:masterfrom
void-ptr974:fix/replicator-rate-limiter-inflight

Conversation

@void-ptr974

Copy link
Copy Markdown
Contributor

Motivation

PersistentReplicator.readMoreEntries() created an InFlightTask before checking the replicator dispatch rate limiter.

When replicator dispatch throttling was enabled and the rate limiter had no message or byte permits, the method scheduled a retry and returned without issuing cursor.asyncReadEntriesOrWait(...). The newly-created task stayed in entries == null, so it looked like a pending cursor read even though no read request existed.

On the next retry, hasPendingRead() returned true and the replicator stopped scheduling further reads. This could leave geo-replication stuck with backlog when replicator dispatch throttling is enabled.

Modifications

Compute producer and rate-limiter permits before creating the in-flight read task.

Create the InFlightTask only after confirming that a real cursor read will be issued. This preserves the invariant that an entries == null in-flight task corresponds to an actual pending cursor read.

Added a unit test covering the no-permit rate-limiter path for both message and byte throttling, verifying that no pending in-flight read task is created.

Verifying this change

This change added tests and can be verified as follows:

  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testRateLimiterWithoutPermitsDoesNotCreateInFlightTask
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.PersistentReplicatorInflightTaskTest.testAcquirePermitsIfNotFetchingSchema
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterMessageReceivedAllMessages
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.ReplicatorRateLimiterTest.testReplicatorRateLimiterByBytes

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@void-ptr974 void-ptr974 force-pushed the fix/replicator-rate-limiter-inflight branch from 7fed1bd to 16ad2d7 Compare June 12, 2026 01:59
@void-ptr974 void-ptr974 force-pushed the fix/replicator-rate-limiter-inflight branch from 16ad2d7 to 25ed1ac Compare June 12, 2026 02:29
@void-ptr974 void-ptr974 marked this pull request as ready for review June 12, 2026 02:38
@void-ptr974

Copy link
Copy Markdown
Contributor Author

Hi @poorbarcode, this one adjusts when PersistentReplicator creates in-flight read tasks under dispatch rate limiting. Could you help review the pending-read / permit invariant when you have time? Thanks!

Comment on lines +356 to +360
if (!isWritable()) {
log.debug("Throttling replication traffic because producer is not writable");
// Minimize the read size if the producer is disconnected or the window is already full
messagesToRead = 1;
}

@lhotari lhotari Jun 15, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes the behavior of the existing code when it's moved here.

There's also a bug in the existing code. This check should be done before acquiring permits. It doesn't make sense that permits and rate limit is calculated based on a different amount than what is used.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional problem is that there's not a way to specify the Netty watermark limits which are used to determine the "writability" of a Netty channel. By default it's a very small value and it could be useful to allow more buffering than the Netty defaults allow.

@lhotari lhotari added the triage/lhotari/important lhotari's triaging label for important issues or PRs label Jun 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

triage/lhotari/important lhotari's triaging label for important issues or PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants