[fix][broker]Do not trigger topic GC if replication is still active#25915
[fix][broker]Do not trigger topic GC if replication is still active#25915poorbarcode wants to merge 14 commits into
Conversation
codelipenghui
left a comment
There was a problem hiding this comment.
@poorbarcode I'm thinking another solution
- Close the replicator producer if the producer is idle for a while (e.g. 10 mins)
- Check all the producers for detecting the inactive topic (now it's more complicated to skip the replicator producer)
- Only delete the inactive topic if there is no producers (including the replicator producer)
Now, your solution added 7 days delay for inactive topic deletion if geo-replication enable. But if there is no messages from last 7 days, the issue can still happen, right?
I agree with this concern. A replicated topic can be quiet for longer than the threshold while the replication relationship is still valid. Using |
|
@codelipenghui @void-ptr974 Changed the solution as @codelipenghui suggested, please review again |
f4194c9 to
0c2ce19
Compare
|
@codelipenghui @lhotari @void-ptr974 Please review the PR again. I have changed the solution:
|
| // Start producer and retry. | ||
| if (state == Disconnected) { | ||
| startProducer(); | ||
| retryReplicateEntries.run(); | ||
| return; |
There was a problem hiding this comment.
[BUG] Auto-resume can replicate newer positions before older ones → reordering, and silent message loss when dedup is enabled
After an idle disconnect, this resume path can replicate a newer batch before the older one it is holding, which breaks replication ordering and — with deduplication enabled — silently drops the older message.
Sequence: when a wait-read completes while state == Disconnected, this branch holds that batch (call it R1, the older positions) and reschedules it ~100ms later, while also calling startProducer(). startProducer() → setProducerAndTriggerReadEntries() flips the state to Started and immediately calls readMoreEntries():
Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.
Two consequences:
- Replication order across the reconnect can be violated.
- The receiving cluster dedups replicated messages by source position, not wire order:
Once R2's higher position is recorded, R1 (lower position) is classified a duplicate and discarded on the remote, while the source side mark-deletes R1 as delivered on the (successful) send callback:
i.e. permanent silent message loss across the reconnect. (Loss requires topic dedup enabled; without it the symptom is out-of-order replication, which is still a correctness issue for ordered consumers.)
The previous design avoided this: cursor.rewind() on (re)connect plus pending-read cancellation guaranteed in-order resume from markDelete + 1. Could we hold the new read until the held batch has been replicated (or rewind / cancel the pending read on disconnect) so resume is strictly in order?
There was a problem hiding this comment.
There's also a replicator dedup related PR in #25860 by @void-ptr974
There was a problem hiding this comment.
Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.
Could you point why the entries is not null?
| // Has backlog. | ||
| long backlog = getNumberOfEntriesInBacklog(); | ||
| if (backlog > 0) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
[QUALITY] Loss-safety now rests entirely on the backlog == 0 precondition — worth documenting the invariant
It would help to document (a comment, or an assertion) that this backlog == 0 precondition is now load-bearing for correctness. Since both the cursor.rewind() on reconnect and the afterDisconnected() rewind were removed, nothing re-reads entries that are read-but-not-yet-acked at producer restart. No-loss safety now rests entirely on (a) this backlog == 0 check and (b) beforeDisconnect() rejecting any in-flight task below the last confirmed entry:
There's no construct of a loss case under these guards currently, so this is fine as written — but the invariant is implicit and easy to break. A future disconnect trigger that doesn't enforce backlog == 0, or a regression in beforeDisconnect(), would silently drop those entries, with no rewind left as a safety net. A short comment stating the requirement would make that explicit.
There was a problem hiding this comment.
I can not understand what concern is it
Motivation
Replication is stuck due to the Topic GC
primary clusterandbackup clusterprimary cluster: Publishing messages intoprimary clusterbackup cluster: No consumer/producer is registered.backup cluster: Check topic GCremote producerbackup cluster: the topic GC progress is waiting for the remote-producer to be disconnected.backup cluster: backlog increases because the replicator was closedModifications
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes