Skip to content

[fix][broker]Do not trigger topic GC if replication is still active#25915

Open
poorbarcode wants to merge 14 commits into
apache:masterfrom
poorbarcode:fix/topic_gc_replication
Open

[fix][broker]Do not trigger topic GC if replication is still active#25915
poorbarcode wants to merge 14 commits into
apache:masterfrom
poorbarcode:fix/topic_gc_replication

Conversation

@poorbarcode

@poorbarcode poorbarcode commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Motivation

Replication is stuck due to the Topic GC

  • Enable binary way replication between the primary cluster and backup cluster
  • primary cluster: Publishing messages into primary cluster
    • backup cluster: No consumer/producer is registered.
  • backup cluster: Check topic GC
    • No subscriptions
    • No producers except remote producer
    • The topic should be GC
      • Disable replication
      • The topic will not be deleted since the remote-side producer is still registered
  • backup cluster: the topic GC progress is waiting for the remote-producer to be disconnected.
    • It will not be executed since no one wants to delete the topic.
  • backup cluster: backlog increases because the replicator was closed
    • Although the messages copied from the remote end will not be copied back repeatedly, the replicator still needs to perform a check and then mark delete.

Modifications

  • Topic GC will only be triggered if there is no producer(includes remote producer) and no replicator
  • Replicator producer will be closed after there are no messages to be replicated anymore.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@poorbarcode poorbarcode added this to the 5.0.0-M1 milestone Jun 1, 2026
@poorbarcode poorbarcode self-assigned this Jun 1, 2026
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/4.2.2 release/4.0.11 ready-to-test labels Jun 1, 2026

@codelipenghui codelipenghui left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode I'm thinking another solution

  1. Close the replicator producer if the producer is idle for a while (e.g. 10 mins)
  2. Check all the producers for detecting the inactive topic (now it's more complicated to skip the replicator producer)
  3. Only delete the inactive topic if there is no producers (including the replicator producer)

Now, your solution added 7 days delay for inactive topic deletion if geo-replication enable. But if there is no messages from last 7 days, the issue can still happen, right?

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@void-ptr974

Copy link
Copy Markdown
Contributor

@poorbarcode I'm thinking another solution

  1. Close the replicator producer if the producer is idle for a while (e.g. 10 mins)
  2. Check all the producers for detecting the inactive topic (now it's more complicated to skip the replicator producer)
  3. Only delete the inactive topic if there is no producers (including the replicator producer)

Now, your solution added 7 days delay for inactive topic deletion if geo-replication enable. But if there is no messages from last 7 days, the issue can still happen, right?

I agree with this concern. A replicated topic can be quiet for longer than the threshold while the replication relationship is still valid.

Using latestPublishTime here makes topic deletion depend on the traffic pattern instead of the producer lifecycle. It seems cleaner to let the replicator close its producer explicitly when it is really idle, and let topic GC only check whether producers still exist.

@poorbarcode

Copy link
Copy Markdown
Contributor Author

@codelipenghui @void-ptr974 Changed the solution as @codelipenghui suggested, please review again

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@poorbarcode poorbarcode requested a review from lhotari June 3, 2026 08:58
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@poorbarcode poorbarcode requested a review from void-ptr974 June 11, 2026 09:47
@poorbarcode

Copy link
Copy Markdown
Contributor Author

@codelipenghui @lhotari @void-ptr974

Please review the PR again. I have changed the solution:

  • Topic GC will only be triggered if there is no producer(includes remote producer) and no replicator
  • Replicator producer will be closed after there are no messages to be replicated, it does not rely on the Persistent Topic anymore

@poorbarcode poorbarcode requested review from codelipenghui, lhotari and void-ptr974 and removed request for codelipenghui, lhotari and void-ptr974 June 11, 2026 09:49
@poorbarcode poorbarcode requested a review from void-ptr974 June 15, 2026 06:25
void-ptr974

This comment was marked as duplicate.

Comment on lines +422 to +426
// Start producer and retry.
if (state == Disconnected) {
startProducer();
retryReplicateEntries.run();
return;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BUG] Auto-resume can replicate newer positions before older ones → reordering, and silent message loss when dedup is enabled

After an idle disconnect, this resume path can replicate a newer batch before the older one it is holding, which breaks replication ordering and — with deduplication enabled — silently drops the older message.

Sequence: when a wait-read completes while state == Disconnected, this branch holds that batch (call it R1, the older positions) and reschedules it ~100ms later, while also calling startProducer(). startProducer()setProducerAndTriggerReadEntries() flips the state to Started and immediately calls readMoreEntries():

protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
/**
* 1. Try change state to {@link Started}.
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
* producer when the state is {@link Started}.
*/
Pair<Boolean, State> changeStateRes;
changeStateRes = compareSetAndGetState(Starting, Started);
if (changeStateRes.getLeft()) {
if (!(producer instanceof ProducerImpl)) {
log.error("The partitions count between two clusters is not the same, "
+ "the replicator can not be created successfully");
doCloseProducerAsync(producer, () -> {});
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
}
this.producer = (ProducerImpl) producer;
// Trigger a new read.
log.info("Created replicator producer");
backOff.reset();
// activate cursor: so, entries can be cached.
this.cursor.setActive();
// read entries
readMoreEntries();
} else {

Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.

Two consequences:

  • Replication order across the reconnect can be violated.
  • The receiving cluster dedups replicated messages by source position, not wire order:

synchronized (highestSequencedPushed) {
Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey);
Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey);
if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null
&& (replSequenceLId < lastSequenceLIdPushed.longValue()
|| (replSequenceLId == lastSequenceLIdPushed.longValue()
&& replSequenceEId <= lastSequenceEIdPushed.longValue()))) {
log.debug()

Once R2's higher position is recorded, R1 (lower position) is classified a duplicate and discarded on the remote, while the source side mark-deletes R1 as delivered on the (successful) send callback:

public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
replicator.log.error()
.attr("inFlightTasks", replicator.inFlightTasks)
.attr("pendingQueueSize", replicator.producer.getPendingQueueSize())
.exception(exception)
.log("Error producing on remote broker");
// cursor should be rewound since it was incremented when readMoreEntries
replicator.beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing);
replicator.doRewindCursor(false);
} else {
replicator.log.debug()
.exception(exception)
.log("Message persisted on remote broker");
inFlightTask.incCompletedEntries();
replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
}

i.e. permanent silent message loss across the reconnect. (Loss requires topic dedup enabled; without it the symptom is out-of-order replication, which is still a correctness issue for ordered consumers.)

The previous design avoided this: cursor.rewind() on (re)connect plus pending-read cancellation guaranteed in-order resume from markDelete + 1. Could we hold the new read until the held batch has been replicated (or rewind / cancel the pending read on disconnect) so resume is strictly in order?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a replicator dedup related PR in #25860 by @void-ptr974

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.

Could you point why the entries is not null?

Comment on lines +286 to +290
// Has backlog.
long backlog = getNumberOfEntriesInBacklog();
if (backlog > 0) {
return;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUALITY] Loss-safety now rests entirely on the backlog == 0 precondition — worth documenting the invariant

It would help to document (a comment, or an assertion) that this backlog == 0 precondition is now load-bearing for correctness. Since both the cursor.rewind() on reconnect and the afterDisconnected() rewind were removed, nothing re-reads entries that are read-but-not-yet-acked at producer restart. No-loss safety now rests entirely on (a) this backlog == 0 check and (b) beforeDisconnect() rejecting any in-flight task below the last confirmed entry:

protected CompletableFuture<Void> beforeDisconnect() {
// Ensure no in-flight task.
synchronized (inFlightTasks) {
for (PersistentReplicator.InFlightTask task : inFlightTasks) {
if (!task.isDone() && task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) {
return CompletableFuture.failedFuture(new BrokerServiceException
.TopicBusyException("Cannot close a replicator with backlog"));
}
}
return CompletableFuture.completedFuture(null);
}
}

There's no construct of a loss case under these guards currently, so this is fine as written — but the invariant is implicit and easy to break. A future disconnect trigger that doesn't enforce backlog == 0, or a regression in beforeDisconnect(), would silently drop those entries, with no rewind left as a safety net. A short comment stating the requirement would make that explicit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not understand what concern is it

@poorbarcode poorbarcode requested a review from lhotari June 16, 2026 04:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready-to-test release/4.0.12 release/4.2.3 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants