Skip to content

[improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes#25309

Open
3424672656 wants to merge 7 commits intoapache:masterfrom
3424672656:refactor_client_metrics
Open

[improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes#25309
3424672656 wants to merge 7 commits intoapache:masterfrom
3424672656:refactor_client_metrics

Conversation

@3424672656
Copy link

@3424672656 3424672656 commented Mar 11, 2026

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

Motivation: The OTel metrics in ProducerImpl/ConsumerImpl are scattered as individual fields, making them hard to test in isolation and inconsistent with the broker-side pattern (OpenTelemetryProducerStats). This PR extracts them into dedicated ProducerMetrics/ConsumerMetrics classes, adds unit tests, and aligns with the existing broker-side design.

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions
Copy link

@3424672656 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Mar 11, 2026
consumerNacksCounter.increment();
}

public void recordDlq() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void recordDlq() {
public void recordDlqMessageSent() {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Pulsar client-side OpenTelemetry (OTel) metrics by extracting producer/consumer metric instruments and recording logic out of ProducerImpl/ConsumerImpl into dedicated helper classes, with new unit tests to validate metric emission.

Changes:

  • Introduce ProducerMetrics and ConsumerMetrics wrappers to centralize OTel instrument creation and recording.
  • Update ProducerImpl and ConsumerImpl to delegate metric recording to the new wrappers.
  • Add unit tests for producer/consumer metric recording using InMemoryMetricReader.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java New producer metrics wrapper (counters + histograms)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java New consumer metrics wrapper (counters + receive-queue tracking)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java Replace inline metric fields/updates with ProducerMetrics delegation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java Replace inline metric fields/updates with ConsumerMetrics delegation
pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java New unit tests for producer OTel metrics
pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java New unit tests for consumer OTel metrics

Comment on lines 440 to 444
if (e != null) {
latencyHistogram.recordFailure(latencyNanos);
producerMetrics.recordSendFailed(latencyNanos, msgSize);
stats.incrementSendFailed();
onSendAcknowledgement(msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In onSendComplete, the metrics are recorded using the msgSize field from the callback instance that received sendComplete(), not the size associated with sendCallback/msg currently being processed in the loop. For batched sends where callbacks are chained, this can mis-account pending bytes and published bytes (and any size-based metrics) for all but the first message. Consider deriving the size per message (e.g., from the current sendCallback when it’s a DefaultSendMessageCallback, or from msg) before calling producerMetrics.recordSendFailed/recordSendSuccess.

Copilot uses AI. Check for mistakes.

pendingBytesUpDownCounter = ip.newUpDownCounter(
"pulsar.client.producer.message.pending.size", Unit.Bytes,
"The size of the messages in the producer internal queue, waiting to sent",
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo/grammar in the metric description: "waiting to sent" should be "waiting to be sent".

Suggested change
"The size of the messages in the producer internal queue, waiting to sent",
"The size of the messages in the producer internal queue, waiting to be sent",

Copilot uses AI. Check for mistakes.

rpcLatencyHistogram = ip.newLatencyHistogram(
"pulsar.client.producer.rpc.send.duration",
"Publish RPC latency experienced internally by the client when sending data to receiving an ack",
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RPC latency histogram description reads awkwardly ("when sending data to receiving an ack"). Consider rephrasing to something like "when sending data and receiving an ack" for clarity.

Suggested change
"Publish RPC latency experienced internally by the client when sending data to receiving an ack",
"Publish RPC latency experienced internally by the client when sending data and receiving an ack",

Copilot uses AI. Check for mistakes.
producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
"The number of producer sessions closed", topic, Attributes.empty());
producerMetrics = new ProducerMetrics(ip, topic);
rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't consistent with the way how latencyHistogram was handled. I'd assume that rpcLatencyHistogram would be pushed down to ProducerMetrics in a similar way as latencyHistogram.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check review comments.

There are most likely CI failures in this PR.
Please setup Pulsar CI in your own fork so that you have full control of running the GitHub Actions workflow, that's explained in https://pulsar.apache.org/contribute/personal-ci/.
CI passes on your side, this PR would be ready for further review.
There are a lot of flaky tests in Pulsar so you'd need to ignore unrelated failures and retry the failing build jobs.


private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null);
return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using null, it's better to have a no-op ProducerMetric instantiated with InstrumentProvider.NOOP. That could be instantiated in a final field in the test class.

ProducerImpl.OpSendMsg opSendMsg =
ProducerImpl.OpSendMsg.create(
LatencyHistogram.NOOP,
(ProducerMetrics) null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using null, it's better to have a no-op ProducerMetric instantiated with InstrumentProvider.NOOP. That could be instantiated in a final field in the test class.

Same comment applies to other cases in this test class.


import io.opentelemetry.api.common.Attributes;

public class ConsumerMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc explaining the purpose of this class


import io.opentelemetry.api.common.Attributes;

public class ProducerMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc explaining the purpose of this class

@3424672656
Copy link
Author

check review comments.

There are most likely CI failures in this PR. Please setup Pulsar CI in your own fork so that you have full control of running the GitHub Actions workflow, that's explained in https://pulsar.apache.org/contribute/personal-ci/. CI passes on your side, this PR would be ready for further review. There are a lot of flaky tests in Pulsar so you'd need to ignore unrelated failures and retry the failing build jobs.

CI pass on my fork: https://github.com/3424672656/pulsar/actions/runs/22962292272/job/66665997391

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants