[improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes#25309
[improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes#253093424672656 wants to merge 7 commits intoapache:masterfrom
Conversation
|
@3424672656 Please add the following content to your PR description and select a checkbox: |
| consumerNacksCounter.increment(); | ||
| } | ||
|
|
||
| public void recordDlq() { |
There was a problem hiding this comment.
| public void recordDlq() { | |
| public void recordDlqMessageSent() { |
There was a problem hiding this comment.
Pull request overview
Refactors Pulsar client-side OpenTelemetry (OTel) metrics by extracting producer/consumer metric instruments and recording logic out of ProducerImpl/ConsumerImpl into dedicated helper classes, with new unit tests to validate metric emission.
Changes:
- Introduce
ProducerMetricsandConsumerMetricswrappers to centralize OTel instrument creation and recording. - Update
ProducerImplandConsumerImplto delegate metric recording to the new wrappers. - Add unit tests for producer/consumer metric recording using
InMemoryMetricReader.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java | New producer metrics wrapper (counters + histograms) |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java | New consumer metrics wrapper (counters + receive-queue tracking) |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java | Replace inline metric fields/updates with ProducerMetrics delegation |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | Replace inline metric fields/updates with ConsumerMetrics delegation |
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java | New unit tests for producer OTel metrics |
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java | New unit tests for consumer OTel metrics |
| if (e != null) { | ||
| latencyHistogram.recordFailure(latencyNanos); | ||
| producerMetrics.recordSendFailed(latencyNanos, msgSize); | ||
| stats.incrementSendFailed(); | ||
| onSendAcknowledgement(msg, null, e); | ||
| sendCallback.getFuture().completeExceptionally(e); |
There was a problem hiding this comment.
In onSendComplete, the metrics are recorded using the msgSize field from the callback instance that received sendComplete(), not the size associated with sendCallback/msg currently being processed in the loop. For batched sends where callbacks are chained, this can mis-account pending bytes and published bytes (and any size-based metrics) for all but the first message. Consider deriving the size per message (e.g., from the current sendCallback when it’s a DefaultSendMessageCallback, or from msg) before calling producerMetrics.recordSendFailed/recordSendSuccess.
|
|
||
| pendingBytesUpDownCounter = ip.newUpDownCounter( | ||
| "pulsar.client.producer.message.pending.size", Unit.Bytes, | ||
| "The size of the messages in the producer internal queue, waiting to sent", |
There was a problem hiding this comment.
Typo/grammar in the metric description: "waiting to sent" should be "waiting to be sent".
| "The size of the messages in the producer internal queue, waiting to sent", | |
| "The size of the messages in the producer internal queue, waiting to be sent", |
|
|
||
| rpcLatencyHistogram = ip.newLatencyHistogram( | ||
| "pulsar.client.producer.rpc.send.duration", | ||
| "Publish RPC latency experienced internally by the client when sending data to receiving an ack", |
There was a problem hiding this comment.
The RPC latency histogram description reads awkwardly ("when sending data to receiving an ack"). Consider rephrasing to something like "when sending data and receiving an ack" for clarity.
| "Publish RPC latency experienced internally by the client when sending data to receiving an ack", | |
| "Publish RPC latency experienced internally by the client when sending data and receiving an ack", |
| producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions, | ||
| "The number of producer sessions closed", topic, Attributes.empty()); | ||
| producerMetrics = new ProducerMetrics(ip, topic); | ||
| rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram(); |
There was a problem hiding this comment.
this isn't consistent with the way how latencyHistogram was handled. I'd assume that rpcLatencyHistogram would be pushed down to ProducerMetrics in a similar way as latencyHistogram.
lhotari
left a comment
There was a problem hiding this comment.
check review comments.
There are most likely CI failures in this PR.
Please setup Pulsar CI in your own fork so that you have full control of running the GitHub Actions workflow, that's explained in https://pulsar.apache.org/contribute/personal-ci/.
CI passes on your side, this PR would be ready for further review.
There are a lot of flaky tests in Pulsar so you'd need to ignore unrelated failures and retry the failing build jobs.
|
|
||
| private ProducerImpl.OpSendMsg createDummyOpSendMsg() { | ||
| return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null); | ||
| return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null); |
There was a problem hiding this comment.
Instead of using null, it's better to have a no-op ProducerMetric instantiated with InstrumentProvider.NOOP. That could be instantiated in a final field in the test class.
| ProducerImpl.OpSendMsg opSendMsg = | ||
| ProducerImpl.OpSendMsg.create( | ||
| LatencyHistogram.NOOP, | ||
| (ProducerMetrics) null, |
There was a problem hiding this comment.
Instead of using null, it's better to have a no-op ProducerMetric instantiated with InstrumentProvider.NOOP. That could be instantiated in a final field in the test class.
Same comment applies to other cases in this test class.
|
|
||
| import io.opentelemetry.api.common.Attributes; | ||
|
|
||
| public class ConsumerMetrics { |
There was a problem hiding this comment.
add javadoc explaining the purpose of this class
|
|
||
| import io.opentelemetry.api.common.Attributes; | ||
|
|
||
| public class ProducerMetrics { |
There was a problem hiding this comment.
add javadoc explaining the purpose of this class
CI pass on my fork: https://github.com/3424672656/pulsar/actions/runs/22962292272/job/66665997391 |
Fixes #xyz
Main Issue: #xyz
PIP: #xyz
Motivation
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
Motivation: The OTel metrics in ProducerImpl/ConsumerImpl are scattered as individual fields, making them hard to test in isolation and inconsistent with the broker-side pattern (OpenTelemetryProducerStats). This PR extracts them into dedicated ProducerMetrics/ConsumerMetrics classes, adds unit tests, and aligns with the existing broker-side design.
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: