Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1e5bdb2
[fix] PIP-457: Remove support for V1 topic names and V1 Admin API (im…
merlimat Feb 27, 2026
9b6b1c7
[fix] PIP-457: Fix checkstyle issues from GLOBAL_CLUSTER removal
merlimat Mar 9, 2026
64f3b8d
[fix] PIP-457: Fix import order in AdminResource (HashSet before List)
merlimat Mar 9, 2026
5ef0583
Fixed some tests
merlimat Mar 9, 2026
b10e8c2
checkstyle
merlimat Mar 9, 2026
d718066
More test fixes
merlimat Mar 10, 2026
13425f3
Fixed integration tests using v1 topics
merlimat Mar 10, 2026
5ed7124
checkstyle
merlimat Mar 10, 2026
2bbfdc6
[fix] PIP-457: Update tests to use V2 topic/namespace names
merlimat Mar 10, 2026
96bc060
[fix] PIP-457: Fix remaining V1 topic/namespace name remnants and che…
merlimat Mar 10, 2026
eb06772
[fix] PIP-457: Fix ProxyServiceStarter tests using non-existent names…
merlimat Mar 10, 2026
8fed34f
[fix] PIP-457: Fix namespace isolation policy tests for V2 namespaces
merlimat Mar 10, 2026
41906d2
[fix] PIP-457: Fix V1 websocket URLs in ProxyPublishConsumeTlsTest
merlimat Mar 10, 2026
5dd93b2
[fix] PIP-457: Fix V1 getNamespaces(tenant, cluster) call in AdminApi…
merlimat Mar 10, 2026
ab7d199
Fixed MessageIdTest
merlimat Mar 10, 2026
7c46866
Fix V1 namespace patterns in ModularLoadManagerImplTest
merlimat Mar 10, 2026
5fd4dba
Fixed BundlesQuotasTest
merlimat Mar 10, 2026
fa9edfd
Fix V1 namespace patterns and missing setup across broker tests
merlimat Mar 10, 2026
a664518
BrokerServiceLookupTest
merlimat Mar 10, 2026
c3e75e5
Fix V1 namespace in ProxyStatsTest
merlimat Mar 10, 2026
2e57c6d
Fix ProxyProtocolTest namespace setup and CompactionRetentionTest sys…
merlimat Mar 10, 2026
94cdb6b
Fix V1 namespace in proxy tests: sample/local -> public/default
merlimat Mar 10, 2026
034d0c4
Fix V1 namespace remnants across broker, admin, and client tests
merlimat Mar 10, 2026
a1090d3
Fix missing namespace setup in ProxyServiceStarterDisableZeroCopyTest
merlimat Mar 10, 2026
0c2ea26
Fix TopicsConsumerImplTest and CompactionRetentionTest failures
merlimat Mar 10, 2026
a797a23
removed unused import
merlimat Mar 10, 2026
a3585dd
Add missing @BeforeMethod to ProxyProtocolTest and TlsSniTest
merlimat Mar 10, 2026
46a7a48
Fix ResendRequestTest.testFailoverInactiveConsumer hash-dependent ass…
merlimat Mar 10, 2026
bcbd267
Fix OpenTelemetryBrokerOperabilityStatsTest connection count assertions
merlimat Mar 10, 2026
8ba3612
Fixed pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/Admi…
merlimat Mar 10, 2026
d71e047
Fix NamespacesTest failures after V1 removal
merlimat Mar 10, 2026
3c45c07
Fix managed ledger metrics namespace including domain suffix
merlimat Mar 10, 2026
a806e6c
Fix SLA namespace lookup and test namespace references after V1 removal
merlimat Mar 10, 2026
ed6315d
Merge remote-tracking branch 'apache/master' into pip-457-impl
merlimat Mar 11, 2026
6bf422f
Merge remote-tracking branch 'apache/master' into pip-457-impl
merlimat Mar 11, 2026
8d6ec09
Reject V1 topic names with cluster component in TopicName parsing
merlimat Mar 11, 2026
6340935
Restore canUpdateCluster validation for tenant updates
merlimat Mar 11, 2026
2de82eb
Revert metrics namespace label change to avoid breaking dashboards
merlimat Mar 11, 2026
09ed1c0
Fix test failures from V1 topic name rejection
merlimat Mar 11, 2026
9882747
Fix test failures from V1 topic name rejection
merlimat Mar 11, 2026
acc47b9
Remove deprecated getNamespaces(tenant, cluster) API
merlimat Mar 12, 2026
24cf28e
Merge remote-tracking branch 'apache/master' into pip-457-impl
merlimat Mar 12, 2026
9484f09
Merge branch 'master' into pip-457-impl
lhotari Mar 13, 2026
a87bb01
Fix ReplicatorGlobalNSTest setup failing on canUpdateCluster validation
merlimat Mar 13, 2026
2f48a4d
Fix ReplicatorGlobalNSTest setup failing on canUpdateCluster validation
merlimat Mar 13, 2026
28a0efe
Fix flaky OneWayReplicatorTestBase.setTopicLevelClusters NPE
merlimat Mar 13, 2026
7bcd52a
Fix flaky OneWayReplicatorTestBase.setTopicLevelClusters NPE
merlimat Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public void testTlsLargeSizeMessage() throws Exception {
internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
internalSetUpForNamespace();

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1")
.create();
for (int i = 0; i < 10; i++) {
byte[] message = new byte[messageSize];
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testTlsClientAuthOverBinaryProtocol() throws Exception {
// Test 1 - Using TLS on binary protocol without sending certs - expect failure
internalSetUpForClient(false, pulsar.getBrokerServiceUrlTls());
try {
pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
Assert.fail("Server should have failed the TLS handshake since client didn't .");
} catch (Exception ex) {
Expand All @@ -94,7 +94,7 @@ public void testTlsClientAuthOverBinaryProtocol() throws Exception {
// Test 2 - Using TLS on binary protocol - sending certs
internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
try {
pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
log.info("second test success: with certs set, consumer sub success");
} catch (Exception ex) {
Expand All @@ -113,7 +113,7 @@ public void testTlsClientAuthOverHTTPProtocol() throws Exception {
// Test 1 - Using TLS on https without sending certs - expect failure
internalSetUpForClient(false, pulsar.getWebServiceAddressTls());
try {
pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
Assert.fail("Server should have failed the TLS handshake since client didn't .");
} catch (Exception ex) {
Expand All @@ -124,7 +124,7 @@ public void testTlsClientAuthOverHTTPProtocol() throws Exception {
// Test 2 - Using TLS on https - sending certs
internalSetUpForClient(true, pulsar.getWebServiceAddressTls());
try {
pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
log.info("second test success: with certs set, consumer sub success");
} catch (Exception ex) {
Expand Down
4 changes: 0 additions & 4 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',
# the topic cannot be automatically created.
allowAutoTopicCreationWithLegacyNamingScheme=true

# If 'strictSubscriptionNameVerification' is true, the new subscription name can only contain (a-zA-Z_0-9) and these
# special chars -=:.
strictlyVerifySubscriptionName=false
Expand Down
4 changes: 0 additions & 4 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1311,10 +1311,6 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',
# the topic cannot be automatically created.
allowAutoTopicCreationWithLegacyNamingScheme=true

# If 'strictSubscriptionNameVerification' is true, the new subscription name can only contain (a-zA-Z_0-9) and these
# special chars -=:.
strictlyVerifySubscriptionName=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public void testOfflineTopicBacklog() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("property/cluster/namespace/my-ledger", config);
ManagedLedger ledger = factory.open("property/namespace/my-ledger", config);
ManagedCursor cursor = ledger.openCursor("c1");

int num = 1;
Expand All @@ -560,7 +560,7 @@ public void testOfflineTopicBacklog() throws Exception {
ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(
DigestType.CRC32, "".getBytes(StandardCharsets.UTF_8), "", false);
PersistentOfflineTopicStats offlineTopicStats = offlineTopicBacklog.getEstimatedUnloadedTopicBacklog(
(ManagedLedgerFactoryImpl) factory, "property/cluster/namespace/my-ledger");
(ManagedLedgerFactoryImpl) factory, "property/namespace/my-ledger");
assertNotNull(offlineTopicStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2360,11 +2360,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private TopicType allowAutoTopicCreationType = TopicType.NON_PARTITIONED;
@FieldContext(category = CATEGORY_SERVER, dynamic = true,
doc = "If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',"
+ "the topic cannot be automatically created."
)
private boolean allowAutoTopicCreationWithLegacyNamingScheme = true;
@FieldContext(category = CATEGORY_SERVER, dynamic = true,
doc = "If 'strictSubscriptionNameVerification' is true, the new subscription name can only contain"
+ " (a-zA-Z_0-9) and these special chars -=:."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,19 +462,7 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
}

private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
return checkPermission(topicName, role, action).thenCompose(permission ->
permission ? checkCluster(topicName) : CompletableFuture.completedFuture(false));
}

private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
if (topicName.isGlobal() || conf.getClusterName().equals(topicName.getCluster())) {
return CompletableFuture.completedFuture(true);
}
if (log.isDebugEnabled()) {
log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), conf.getClusterName());
}
return pulsarResources.getClusterResources().listAsync()
.thenApply(clusters -> clusters.contains(topicName.getCluster()));
return checkPermission(topicName, role, action);
}

public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.resources;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -48,8 +47,8 @@ public CompletableFuture<List<String>> listTenantsAsync() {

public CompletableFuture<Void> deleteTenantAsync(String tenantName) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenantName))
.thenCompose(clusters -> FutureUtil.waitForAll(clusters.stream()
.map(cluster -> getCache().delete(joinPath(BASE_POLICIES_PATH, tenantName, cluster)))
.thenCompose(namespaces -> FutureUtil.waitForAll(namespaces.stream()
.map(ns -> getCache().delete(joinPath(BASE_POLICIES_PATH, tenantName, ns)))
.collect(Collectors.toList()))
).thenCompose(__ -> deleteAsync(joinPath(BASE_POLICIES_PATH, tenantName)));
}
Expand Down Expand Up @@ -85,124 +84,79 @@ public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
List<String> namespaces = new ArrayList<>();

// this will return a cluster in v1 and a namespace in v2
for (String clusterOrNamespace : getChildren(joinPath(BASE_POLICIES_PATH, tenant))) {
// Then get the list of namespaces
final List<String> children = getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterOrNamespace));
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace created
// with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list
try {
if (get(joinPath(BASE_POLICIES_PATH, namespace)).isPresent()) {
namespaces.add(namespace);
}
} catch (MetadataStoreException.ContentDeserializationException e) {
// not a namespace node
for (String ns : getChildren(joinPath(BASE_POLICIES_PATH, tenant))) {
String namespace = NamespaceName.get(tenant, ns).toString();
try {
if (get(joinPath(BASE_POLICIES_PATH, namespace)).isPresent()) {
namespaces.add(namespace);
}

} else {
children.forEach(ns -> {
namespaces.add(NamespaceName.get(tenant, clusterOrNamespace, ns).toString());
});
} catch (MetadataStoreException.ContentDeserializationException e) {
// not a namespace node
}
}

return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
// this will return a cluster in v1 and a namespace in v2
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
} else {
CompletableFuture<List<String>> ret = new CompletableFuture();
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
.toString()).collect(Collectors.toList()));
return ret;
}
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
.thenCompose(nsList -> nsList.stream().map(key -> {
String namespace = NamespaceName.get(tenant, key).toString();
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent()
? List.of(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
}).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<Void> hasActiveNamespace(String tenant) {
CompletableFuture<Void> activeNamespaceFuture = new CompletableFuture<>();
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant)).thenAccept(clusterOrNamespaceList -> {
if (clusterOrNamespaceList == null || clusterOrNamespaceList.isEmpty()) {
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant)).thenAccept(nsList -> {
if (nsList == null || nsList.isEmpty()) {
activeNamespaceFuture.complete(null);
return;
}
List<CompletableFuture<Void>> activeNamespaceListFuture = new ArrayList<>();
clusterOrNamespaceList.forEach(clusterOrNamespace -> {
// get list of active V1 namespace
nsList.forEach(ns -> {
CompletableFuture<Void> checkNs = new CompletableFuture<>();
activeNamespaceListFuture.add(checkNs);
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, clusterOrNamespace))
.whenComplete((children, ex) -> {
if (ex != null) {
checkNs.completeExceptionally(ex);
return;
}
if (children != null && !children.isEmpty()) {
checkNs.completeExceptionally(
new IllegalStateException("The tenant still has active namespaces"));
return;
}
String namespace = NamespaceName.get(tenant, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created
// with the v1 admin format (prop/cluster/ns) and then deleted, so no need to
// add it to the list
getAsync(joinPath(BASE_POLICIES_PATH, namespace)).thenApply(data -> {
if (data.isPresent()) {
checkNs.completeExceptionally(new IllegalStateException(
"The tenant still has active namespaces"));
} else {
checkNs.complete(null);
}
return null;
}).exceptionally(ex2 -> {
if (ex2.getCause() instanceof MetadataStoreException.ContentDeserializationException) {
// it's not a valid namespace-node
checkNs.complete(null);
} else {
checkNs.completeExceptionally(ex2);
}
return null;
});
});
FutureUtil.waitForAll(activeNamespaceListFuture).thenAccept(r -> {
activeNamespaceFuture.complete(null);
}).exceptionally(ex -> {
activeNamespaceFuture.completeExceptionally(ex.getCause());
String namespace = NamespaceName.get(tenant, ns).toString();
getAsync(joinPath(BASE_POLICIES_PATH, namespace)).thenApply(data -> {
if (data.isPresent()) {
checkNs.completeExceptionally(new IllegalStateException(
"The tenant still has active namespaces"));
} else {
checkNs.complete(null);
}
return null;
}).exceptionally(ex2 -> {
if (ex2.getCause() instanceof MetadataStoreException.ContentDeserializationException) {
// it's not a valid namespace-node
checkNs.complete(null);
} else {
checkNs.completeExceptionally(ex2);
}
return null;
});
});
FutureUtil.waitForAll(activeNamespaceListFuture).thenAccept(r -> {
activeNamespaceFuture.complete(null);
}).exceptionally(ex -> {
activeNamespaceFuture.completeExceptionally(ex.getCause());
return null;
});
}).exceptionally(ex -> {
activeNamespaceFuture.completeExceptionally(ex.getCause());
return null;
Expand Down
Loading
Loading