[fix][broker]system topic was created with different partitions acrossing clusters after enabled namespace-level replication#25312
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #25312 +/- ##
============================================
+ Coverage 72.63% 72.70% +0.07%
+ Complexity 34632 34236 -396
============================================
Files 1967 1954 -13
Lines 156326 154769 -1557
Branches 17812 17713 -99
============================================
- Hits 113542 112521 -1021
+ Misses 33713 33184 -529
+ Partials 9071 9064 -7
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
Show resolved
Hide resolved
| PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); | ||
| assertTrue(persistentTopic1.getReplicators().isEmpty()); | ||
| PersistentTopic persistentTopic2 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); | ||
| assertTrue(persistentTopic2.getReplicators().isEmpty()); |
There was a problem hiding this comment.
| PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); | |
| assertTrue(persistentTopic1.getReplicators().isEmpty()); | |
| PersistentTopic persistentTopic2 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); | |
| assertTrue(persistentTopic2.getReplicators().isEmpty()); | |
| PersistentTopic persistentTopic1 = (PersistentTopic) broker1.getTopic(tp, false).join().get(); | |
| assertTrue(persistentTopic1.getReplicators().isEmpty()); | |
| PersistentTopic persistentTopic2 = (PersistentTopic) broker2.getTopic(tp, false).join().get(); | |
| assertTrue(persistentTopic2.getReplicators().isEmpty()); |
| if (actEx instanceof PulsarClientException.NotFoundException | ||
| | actEx instanceof PulsarClientException.TopicDoesNotExistException | ||
| | actEx instanceof PulsarAdminException.NotFoundException) { | ||
| future.complete(TopicExistsInfo.newTopicNotExists()); |
There was a problem hiding this comment.
If there is 3+ clusters, this branch returns TopicExistsInfo.newTopicNotExists() as soon as the first remote cluster says the topic is missing, the later clusters are never consulted.
There was a problem hiding this comment.
Yes, take the first one as the standard. Since the second cluster has already been compared with the first cluster when creating the topic, we can assume that the old cluster is consistent
Motivation
The situation that replication does not work as expected.
c1public/defaultThe system topic
__change_eventshas already been created as a non-partitioned topic; in other words, the topic named__change_eventshas no partition.c2, has the following configurationsallowAutoTopicCreationType=partitioneddefaultNumPartitions=1These two configurations mean that the new topic will be created as a partitioned topic and contain
1partition.If we enable replication for the namespace between two clusters, the following issue will happen
c2will create a partitioned topic__change_events, and it will have a partition like this__change_events-partition-0c1will start replication for the non-partitioned topic__change_events, which triggers a non-partitioned topic creation on thec2cluster.c2will also start a replication toc1since it has a partitioned topic which has a partition named__change_events-partition-0, it triggers a partitioned creation onc1cluster.Eventually, both clusters will hold both partitioned topics and non-partitioned topics as follows, but they have the same name.
admin/partitioned-toipic:{partitions: 1}managed-ledger/persistent:[__change_events-partition-0, __change_events]Because this is a system topic, it will lead to fragmentation and cause serious problems
Modifications
consumer/producer's startingbroker.conf -> isCreateTopicToRemoteClusterForReplicationistrueThis PR will not cause such an infinite loop: the local cluster queries the remote cluster for the topic type and partition count, and on the remote side, the request is handled by handlePartitionMetadataRequest. When determining the auto-creation policy, the remote cluster would otherwise query the local cluster again. However, during the initial query ("the local cluster queries the remote cluster for the topic type and partition count"), the request is already limited to existing topics only, so the remote cluster will not execute the logic for fetching the auto-creation policy.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x