Skip to content

[fix][broker] Topic policies loading with timeout and retries#25295

Open
programmerahul wants to merge 2 commits intoapache:masterfrom
programmerahul:timeoutInPolicies
Open

[fix][broker] Topic policies loading with timeout and retries#25295
programmerahul wants to merge 2 commits intoapache:masterfrom
programmerahul:timeoutInPolicies

Conversation

@programmerahul
Copy link

Fixes [#25294]

Motivation

While loading the system topic policies cache in broker, there is no timeout and retries. Due to this, there are cases observed , where the reader is stuck at some point and so the topic policies never gets loaded. So the topics become unavailable.

Modifications

This pr adds configurable timeout and retries for loading topic policies cache.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)
This change added tests and can be verified as follows:

  • added test to check retries when a stuck reader is present
  • added test to check unloading topics after all retries.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics : Metric gets emitted when topic policies load fails
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: programmerahul#1

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Mar 6, 2026
@codelipenghui codelipenghui added this to the 4.2.0 milestone Mar 9, 2026
@codelipenghui codelipenghui added type/bug The PR fixed a bug or issue reported a bug release/4.0.10 release/4.1.4 labels Mar 9, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds timeout + retry semantics to broker-side system topic policies cache initialization to prevent stuck readers from blocking topic availability, and emits metrics/logs when initialization fails.

Changes:

  • Add configurable timeout and max-retry settings for topic policies cache initialization.
  • Implement timeout detection, cleanup, retry, and final “unload bundles” fallback on retry exhaustion.
  • Add tests covering timeout + retry and timeout + retry exhaustion behaviors.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.

File Description
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java Implements timeout/retry logic for cache init, adds Prometheus counters, and unloads bundles on repeated timeouts.
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java Adds new broker configs for cache init timeout and max retries.
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java Adds tests for timeout detection, retries, and unload behavior after retry exhaustion.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +91 to +97
.register();

private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
.register();
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static Counter.build(...).register() registers into the global default Prometheus registry at class-load time. In long-running brokers and especially in unit/integration tests that may load/reload components in the same JVM, this pattern can trigger duplicate-collector registration errors and makes metric lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing metrics/registry facilities (or a registry owned by the broker instance) rather than static global registration in the service class.

Suggested change
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
.register();
.create();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
.create();

Copilot uses AI. Check for mistakes.
private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = Counter.build(
"pulsar_topic_policies_cache_init_failures_total",
"Total number of topic policies cache initialization failures after all retries exhausted")
.labelNames("namespace")
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.

Copilot uses AI. Check for mistakes.
Comment on lines +89 to +96
"Total number of topic policies cache initialization failures after all retries exhausted")
.labelNames("namespace")
.register();

private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.

Suggested change
"Total number of topic policies cache initialization failures after all retries exhausted")
.labelNames("namespace")
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
"Total number of topic policies cache initialization failures after all retries exhausted, per tenant")
.labelNames("tenant")
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts), per tenant")
.labelNames("tenant")

Copilot uses AI. Check for mistakes.
log.error("[{}] Failed to initialize topic policies cache",
namespace, ex);
initNamespacePolicyFuture.completeExceptionally(ex);
cleanPoliciesCacheInitMap(namespace, true);
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes cleanup behavior from the prior logic that conditioned the boolean argument on the exception type (e.g., isAlreadyClosedException(ex)), and also previously treated reader-creation failures differently. Always passing true can alter shutdown/cleanup semantics and may cause incorrect cleanup decisions for non-close-related failures. Suggest restoring the prior decision logic (e.g., pass isAlreadyClosedException(ex) where appropriate, and keep the more specific branching for reader creation vs. later-stage failures if it impacts cleanup).

Suggested change
cleanPoliciesCacheInitMap(namespace, true);
cleanPoliciesCacheInitMap(namespace,
ex instanceof PulsarClientException.AlreadyClosedException
|| ex.getCause() instanceof PulsarClientException.AlreadyClosedException);

Copilot uses AI. Check for mistakes.
Comment on lines +660 to +690
return attempt
.thenApply(v -> CompletableFuture.completedFuture(v))
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof TimeoutException) {
TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc();
// Close the stuck reader and remove from cache so a new one can be created
closeAndRemoveReaderForNamespace(namespace);

if (retriesLeft > 0) {
log.warn("[{}] Topic policies cache initialization timed out after {}s. "
+ "Retrying... ({} retries left)",
namespace, timeoutSeconds, retriesLeft);
return initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1);
} else {
log.error("[{}] Topic policies cache initialization failed after all retries "
+ "(timed out after {}s per attempt). Unloading namespace bundles "
+ "from this broker.",
namespace, timeoutSeconds);
TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc();
unloadNamespaceBundlesAsync(namespace);
return CompletableFuture.<Void>failedFuture(
new BrokerServiceException(
"Topic policies cache initialization failed after all retries "
+ "for namespace " + namespace));
}
}
// For non-timeout exceptions (e.g. reader creation failure), propagate directly
return CompletableFuture.<Void>failedFuture(cause);
})
.thenCompose(Function.identity());
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thenApply(v -> completedFuture(v)) + exceptionally(...) + thenCompose(identity()) pattern is harder to read and maintain than necessary. Consider rewriting using a single handle/whenComplete-style branch that returns either a value or a next-stage future (and then thenCompose once), or using exceptionallyCompose if the project’s Java target supports it—this will avoid nested futures and make the retry/failure flow clearer.

Copilot uses AI. Check for mistakes.
Comment on lines +688 to +696
// The first attempt times out, the second attempt should succeed (since hasMoreEventsAsync
// returns false on second call)
try {
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the important thing is
// the timeout was detected
}

Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test swallows all exceptions and explicitly allows the retry to “may or may not succeed”, which makes the test non-deterministic and not aligned with its name (“SuccessfulRetry”). It should assert a clear expected outcome (e.g., prepareFuture completes successfully and hasMoreEventsAsync() is invoked twice, or at minimum that a retry occurred via verifiable interactions), otherwise the test can pass even if the retry logic is broken.

Suggested change
// The first attempt times out, the second attempt should succeed (since hasMoreEventsAsync
// returns false on second call)
try {
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the important thing is
// the timeout was detected
}
// The first attempt should time out, and the second attempt should succeed (since
// hasMoreEventsAsync returns false on the second call)
Boolean initResult = prepareFuture.get(30, TimeUnit.SECONDS);
assertTrue(initResult);
// Verify that hasMoreEventsAsync was invoked twice (initial attempt + retry)
verify(mockReader, times(2)).hasMoreEventsAsync();

Copilot uses AI. Check for mistakes.
Comment on lines +691 to +707
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the important thing is
// the timeout was detected
}

// Verify that the timeout was detected and retry was attempted
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains(
"Topic policies cache initialization timed out"));
assertTrue(timeoutLogFound);
});

// Reset config
conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
conf.setTopicPoliciesCacheInitMaxRetries(3);
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config reset is not in a finally block. If the test fails before reaching these lines, it can leak altered broker config into subsequent tests and cause cascading failures. Consider wrapping the body in try/finally (or use a test framework hook) to guarantee restoration.

Suggested change
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the important thing is
// the timeout was detected
}
// Verify that the timeout was detected and retry was attempted
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains(
"Topic policies cache initialization timed out"));
assertTrue(timeoutLogFound);
});
// Reset config
conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
conf.setTopicPoliciesCacheInitMaxRetries(3);
try {
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the important thing is
// the timeout was detected
}
// Verify that the timeout was detected and retry was attempted
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains(
"Topic policies cache initialization timed out"));
assertTrue(timeoutLogFound);
});
} finally {
// Reset config
conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
conf.setTopicPoliciesCacheInitMaxRetries(3);
}

Copilot uses AI. Check for mistakes.
Comment on lines +760 to +765
boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> {
String msg = logEvent.getMessage().toString();
return msg.contains("Unloading") && msg.contains("namespace bundles")
|| msg.contains("No owned bundles found to unload");
});
assertTrue(unloadLogFound);
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion is potentially racy because unloading is initiated asynchronously; unlike the failure-log assertion above, it doesn’t wait for the unload log to appear. Use Awaitility (or otherwise synchronize) around this check to make the test deterministic.

Suggested change
boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> {
String msg = logEvent.getMessage().toString();
return msg.contains("Unloading") && msg.contains("namespace bundles")
|| msg.contains("No owned bundles found to unload");
});
assertTrue(unloadLogFound);
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> {
String msg = logEvent.getMessage().toString();
return (msg.contains("Unloading") && msg.contains("namespace bundles"))
|| msg.contains("No owned bundles found to unload");
});
assertTrue(unloadLogFound);
});

Copilot uses AI. Check for mistakes.
@programmerahul
Copy link
Author

I will try to address copilot review comments by today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-required Your PR changes impact docs and you will update later. release/4.0.10 release/4.1.4 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants