[fix][broker] Topic policies loading with timeout and retries#25295
[fix][broker] Topic policies loading with timeout and retries#25295programmerahul wants to merge 2 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds timeout + retry semantics to broker-side system topic policies cache initialization to prevent stuck readers from blocking topic availability, and emits metrics/logs when initialization fails.
Changes:
- Add configurable timeout and max-retry settings for topic policies cache initialization.
- Implement timeout detection, cleanup, retry, and final “unload bundles” fallback on retry exhaustion.
- Add tests covering timeout + retry and timeout + retry exhaustion behaviors.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | Implements timeout/retry logic for cache init, adds Prometheus counters, and unloads bundles on repeated timeouts. |
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | Adds new broker configs for cache init timeout and max retries. |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java | Adds tests for timeout detection, retries, and unload behavior after retry exhaustion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .register(); | ||
|
|
||
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | ||
| "pulsar_topic_policies_cache_init_timeouts_total", | ||
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | ||
| .labelNames("namespace") | ||
| .register(); |
There was a problem hiding this comment.
Static Counter.build(...).register() registers into the global default Prometheus registry at class-load time. In long-running brokers and especially in unit/integration tests that may load/reload components in the same JVM, this pattern can trigger duplicate-collector registration errors and makes metric lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing metrics/registry facilities (or a registry owned by the broker instance) rather than static global registration in the service class.
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| .register(); | |
| .create(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| .create(); |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = Counter.build( | ||
| "pulsar_topic_policies_cache_init_failures_total", | ||
| "Total number of topic policies cache initialization failures after all retries exhausted") | ||
| .labelNames("namespace") |
There was a problem hiding this comment.
Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.
| "Total number of topic policies cache initialization failures after all retries exhausted") | ||
| .labelNames("namespace") | ||
| .register(); | ||
|
|
||
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | ||
| "pulsar_topic_policies_cache_init_timeouts_total", | ||
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | ||
| .labelNames("namespace") |
There was a problem hiding this comment.
Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.
| "Total number of topic policies cache initialization failures after all retries exhausted") | |
| .labelNames("namespace") | |
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| "Total number of topic policies cache initialization failures after all retries exhausted, per tenant") | |
| .labelNames("tenant") | |
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts), per tenant") | |
| .labelNames("tenant") |
| log.error("[{}] Failed to initialize topic policies cache", | ||
| namespace, ex); | ||
| initNamespacePolicyFuture.completeExceptionally(ex); | ||
| cleanPoliciesCacheInitMap(namespace, true); |
There was a problem hiding this comment.
This changes cleanup behavior from the prior logic that conditioned the boolean argument on the exception type (e.g., isAlreadyClosedException(ex)), and also previously treated reader-creation failures differently. Always passing true can alter shutdown/cleanup semantics and may cause incorrect cleanup decisions for non-close-related failures. Suggest restoring the prior decision logic (e.g., pass isAlreadyClosedException(ex) where appropriate, and keep the more specific branching for reader creation vs. later-stage failures if it impacts cleanup).
| cleanPoliciesCacheInitMap(namespace, true); | |
| cleanPoliciesCacheInitMap(namespace, | |
| ex instanceof PulsarClientException.AlreadyClosedException | |
| || ex.getCause() instanceof PulsarClientException.AlreadyClosedException); |
| return attempt | ||
| .thenApply(v -> CompletableFuture.completedFuture(v)) | ||
| .exceptionally(ex -> { | ||
| Throwable cause = FutureUtil.unwrapCompletionException(ex); | ||
| if (cause instanceof TimeoutException) { | ||
| TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc(); | ||
| // Close the stuck reader and remove from cache so a new one can be created | ||
| closeAndRemoveReaderForNamespace(namespace); | ||
|
|
||
| if (retriesLeft > 0) { | ||
| log.warn("[{}] Topic policies cache initialization timed out after {}s. " | ||
| + "Retrying... ({} retries left)", | ||
| namespace, timeoutSeconds, retriesLeft); | ||
| return initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1); | ||
| } else { | ||
| log.error("[{}] Topic policies cache initialization failed after all retries " | ||
| + "(timed out after {}s per attempt). Unloading namespace bundles " | ||
| + "from this broker.", | ||
| namespace, timeoutSeconds); | ||
| TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc(); | ||
| unloadNamespaceBundlesAsync(namespace); | ||
| return CompletableFuture.<Void>failedFuture( | ||
| new BrokerServiceException( | ||
| "Topic policies cache initialization failed after all retries " | ||
| + "for namespace " + namespace)); | ||
| } | ||
| } | ||
| // For non-timeout exceptions (e.g. reader creation failure), propagate directly | ||
| return CompletableFuture.<Void>failedFuture(cause); | ||
| }) | ||
| .thenCompose(Function.identity()); |
There was a problem hiding this comment.
The thenApply(v -> completedFuture(v)) + exceptionally(...) + thenCompose(identity()) pattern is harder to read and maintain than necessary. Consider rewriting using a single handle/whenComplete-style branch that returns either a value or a next-stage future (and then thenCompose once), or using exceptionallyCompose if the project’s Java target supports it—this will avoid nested futures and make the retry/failure flow clearer.
| // The first attempt times out, the second attempt should succeed (since hasMoreEventsAsync | ||
| // returns false on second call) | ||
| try { | ||
| prepareFuture.get(30, TimeUnit.SECONDS); | ||
| } catch (Exception e) { | ||
| // Retry may or may not succeed depending on mock setup; the important thing is | ||
| // the timeout was detected | ||
| } | ||
|
|
There was a problem hiding this comment.
This test swallows all exceptions and explicitly allows the retry to “may or may not succeed”, which makes the test non-deterministic and not aligned with its name (“SuccessfulRetry”). It should assert a clear expected outcome (e.g., prepareFuture completes successfully and hasMoreEventsAsync() is invoked twice, or at minimum that a retry occurred via verifiable interactions), otherwise the test can pass even if the retry logic is broken.
| // The first attempt times out, the second attempt should succeed (since hasMoreEventsAsync | |
| // returns false on second call) | |
| try { | |
| prepareFuture.get(30, TimeUnit.SECONDS); | |
| } catch (Exception e) { | |
| // Retry may or may not succeed depending on mock setup; the important thing is | |
| // the timeout was detected | |
| } | |
| // The first attempt should time out, and the second attempt should succeed (since | |
| // hasMoreEventsAsync returns false on the second call) | |
| Boolean initResult = prepareFuture.get(30, TimeUnit.SECONDS); | |
| assertTrue(initResult); | |
| // Verify that hasMoreEventsAsync was invoked twice (initial attempt + retry) | |
| verify(mockReader, times(2)).hasMoreEventsAsync(); |
| prepareFuture.get(30, TimeUnit.SECONDS); | ||
| } catch (Exception e) { | ||
| // Retry may or may not succeed depending on mock setup; the important thing is | ||
| // the timeout was detected | ||
| } | ||
|
|
||
| // Verify that the timeout was detected and retry was attempted | ||
| Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { | ||
| boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> | ||
| logEvent.getMessage().toString().contains( | ||
| "Topic policies cache initialization timed out")); | ||
| assertTrue(timeoutLogFound); | ||
| }); | ||
|
|
||
| // Reset config | ||
| conf.setTopicPoliciesCacheInitTimeoutSeconds(300); | ||
| conf.setTopicPoliciesCacheInitMaxRetries(3); |
There was a problem hiding this comment.
The config reset is not in a finally block. If the test fails before reaching these lines, it can leak altered broker config into subsequent tests and cause cascading failures. Consider wrapping the body in try/finally (or use a test framework hook) to guarantee restoration.
| prepareFuture.get(30, TimeUnit.SECONDS); | |
| } catch (Exception e) { | |
| // Retry may or may not succeed depending on mock setup; the important thing is | |
| // the timeout was detected | |
| } | |
| // Verify that the timeout was detected and retry was attempted | |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { | |
| boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> | |
| logEvent.getMessage().toString().contains( | |
| "Topic policies cache initialization timed out")); | |
| assertTrue(timeoutLogFound); | |
| }); | |
| // Reset config | |
| conf.setTopicPoliciesCacheInitTimeoutSeconds(300); | |
| conf.setTopicPoliciesCacheInitMaxRetries(3); | |
| try { | |
| prepareFuture.get(30, TimeUnit.SECONDS); | |
| } catch (Exception e) { | |
| // Retry may or may not succeed depending on mock setup; the important thing is | |
| // the timeout was detected | |
| } | |
| // Verify that the timeout was detected and retry was attempted | |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { | |
| boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> | |
| logEvent.getMessage().toString().contains( | |
| "Topic policies cache initialization timed out")); | |
| assertTrue(timeoutLogFound); | |
| }); | |
| } finally { | |
| // Reset config | |
| conf.setTopicPoliciesCacheInitTimeoutSeconds(300); | |
| conf.setTopicPoliciesCacheInitMaxRetries(3); | |
| } |
| boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> { | ||
| String msg = logEvent.getMessage().toString(); | ||
| return msg.contains("Unloading") && msg.contains("namespace bundles") | ||
| || msg.contains("No owned bundles found to unload"); | ||
| }); | ||
| assertTrue(unloadLogFound); |
There was a problem hiding this comment.
This assertion is potentially racy because unloading is initiated asynchronously; unlike the failure-log assertion above, it doesn’t wait for the unload log to appear. Use Awaitility (or otherwise synchronize) around this check to make the test deterministic.
| boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> { | |
| String msg = logEvent.getMessage().toString(); | |
| return msg.contains("Unloading") && msg.contains("namespace bundles") | |
| || msg.contains("No owned bundles found to unload"); | |
| }); | |
| assertTrue(unloadLogFound); | |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { | |
| boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> { | |
| String msg = logEvent.getMessage().toString(); | |
| return (msg.contains("Unloading") && msg.contains("namespace bundles")) | |
| || msg.contains("No owned bundles found to unload"); | |
| }); | |
| assertTrue(unloadLogFound); | |
| }); |
|
I will try to address copilot review comments by today. |
Fixes [#25294]
Motivation
While loading the system topic policies cache in broker, there is no timeout and retries. Due to this, there are cases observed , where the reader is stuck at some point and so the topic policies never gets loaded. So the topics become unavailable.
Modifications
This pr adds configurable timeout and retries for loading topic policies cache.
Verifying this change
(Please pick either of the following options)
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: programmerahul#1