Skip to content

HYPERFLEET-469: Add Prometheus metrics with hyperfleet_broker_ prefix#14

Open
rafabene wants to merge 4 commits intoopenshift-hyperfleet:mainfrom
rafabene:HYPERFLEET-469
Open

HYPERFLEET-469: Add Prometheus metrics with hyperfleet_broker_ prefix#14
rafabene wants to merge 4 commits intoopenshift-hyperfleet:mainfrom
rafabene:HYPERFLEET-469

Conversation

@rafabene
Copy link
Contributor

@rafabene rafabene commented Feb 25, 2026

Summary

  • Add MetricsRecorder with four Prometheus metrics following the HyperFleet Metrics Standard (hyperfleet_broker_ prefix)
  • Instrument Publisher.Publish() with published counter and error counter
  • Instrument Subscriber.Subscribe() handler with consumed counter, duration histogram, and error counters (handler/conversion)
  • Metrics are optional via SetMetrics() on both Publisher and Subscriber interfaces, maintaining full backwards compatibility

Metrics

Metric Type Labels
hyperfleet_broker_messages_consumed_total Counter topic, component, version
hyperfleet_broker_messages_published_total Counter topic, component, version
hyperfleet_broker_errors_total Counter topic, error_type, component, version
hyperfleet_broker_message_duration_seconds Histogram topic, component, version

Test plan

  • Unit tests for metric registration, counters, histogram, and nil-safety
  • Integration tests for publisher metrics (successful publish, failed publish)
  • SetMetrics tests for both Publisher and Subscriber
  • Nil MetricsRecorder does not panic (backwards compatibility)
  • make test passes
  • make lint passes (0 issues)

Jira: https://issues.redhat.com/browse/HYPERFLEET-469

Summary by CodeRabbit

  • New Features

    • Integrated Prometheus metrics for broker: record consumed, published, errors, and processing duration with topic/component/version labels. Publishers and subscribers accept an optional, nil-safe metrics recorder to enable instrumentation.
  • Tests

    • Added comprehensive tests for metric registration, labeling, counters/histograms, error paths, nil-safe behavior, and publish/subscribe integration.
  • Documentation

    • Added README section with metrics usage, examples, exported metrics, error types, and duration buckets.

Add MetricsRecorder with four metrics per the HyperFleet Metrics Standard:
- hyperfleet_broker_messages_consumed_total (counter)
- hyperfleet_broker_messages_published_total (counter)
- hyperfleet_broker_errors_total (counter, with error_type label)
- hyperfleet_broker_message_duration_seconds (histogram)

Metrics are optional via SetMetrics() on Publisher/Subscriber interfaces,
maintaining full backwards compatibility.
@openshift-ci
Copy link

openshift-ci bot commented Feb 25, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign yasun1 for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 95cc221 and 383ec36.

📒 Files selected for processing (14)
  • README.md
  • broker/broker.go
  • broker/error_test.go
  • broker/metrics_test.go
  • broker/options_test.go
  • broker/publisher.go
  • broker/subscriber.go
  • example/cmd/publisher/main.go
  • example/cmd/subscriber/main.go
  • test/integration/broker_leak_test.go
  • test/integration/broker_perf_test.go
  • test/integration/common/common.go
  • test/integration/googlepubsub/googlepubsub_test.go
  • test/integration/rabbitmq/rabbitmq_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • broker/metrics_test.go

Walkthrough

Adds Prometheus instrumentation to the broker package via a new exported MetricsRecorder type and constructor NewMetricsRecorder(component, version, registerer). Publisher and Subscriber concrete types gain an optional metrics field (nullable); NewPublisher and NewSubscriber signatures now accept a *MetricsRecorder parameter and set it on created objects. Publish and Subscribe flows record consumed/published counters, error counters (conversion, publish, handler), and handler duration histograms. Tests, README, and go.mod updated to cover usage and dependencies.

Sequence Diagram(s)

sequenceDiagram
    participant Code
    participant MetricsRecorder
    participant Prometheus as Prometheus Registry

    Code->>MetricsRecorder: NewMetricsRecorder(component, version, registerer)
    activate MetricsRecorder
    MetricsRecorder->>MetricsRecorder: Create CounterVec (messages_consumed)
    MetricsRecorder->>Prometheus: Register messages_consumed
    MetricsRecorder->>MetricsRecorder: Create CounterVec (messages_published)
    MetricsRecorder->>Prometheus: Register messages_published
    MetricsRecorder->>MetricsRecorder: Create CounterVec (errors)
    MetricsRecorder->>Prometheus: Register errors
    MetricsRecorder->>MetricsRecorder: Create HistogramVec (message_duration)
    MetricsRecorder->>Prometheus: Register message_duration
    deactivate MetricsRecorder
    MetricsRecorder-->>Code: *MetricsRecorder
Loading
sequenceDiagram
    participant Client
    participant Publisher
    participant MetricsRecorder
    participant Watermill as Watermill Publisher

    Client->>Publisher: SetMetrics(recorder)
    Publisher->>Publisher: store metrics

    Client->>Publisher: Publish(topic, msg)
    activate Publisher
    Publisher->>Watermill: Publish message
    alt success
        Watermill-->>Publisher: OK
        Publisher->>MetricsRecorder: RecordPublished(topic)
    else failure
        Watermill-->>Publisher: Error
        Publisher->>MetricsRecorder: RecordError(topic, "publish")
    end
    deactivate Publisher
Loading
sequenceDiagram
    participant MessageSource
    participant Subscriber
    participant MetricsRecorder
    participant Handler

    MessageSource->>Subscriber: Message arrives
    activate Subscriber
    Subscriber->>MetricsRecorder: RecordConsumed(topic)
    Subscriber->>Subscriber: convert/parse message
    alt convert success
        Subscriber->>Handler: handle message
        activate Handler
        Handler-->>Subscriber: result / error
        deactivate Handler
    else convert fail
        Subscriber->>MetricsRecorder: RecordError(topic, "conversion")
    end
    Subscriber->>MetricsRecorder: RecordDuration(topic, duration)
    alt handler error
        Subscriber->>MetricsRecorder: RecordError(topic, "handler")
    end
    deactivate Subscriber
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

lgtm, approved

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.28% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly identifies the primary change: adding Prometheus metrics with the hyperfleet_broker_ prefix, which aligns with the main objective of instrumenting the broker with metrics.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
broker/publisher.go (1)

48-52: ⚠️ Potential issue | 🟡 Minor

Conversion errors during publish are not recorded in metrics.

When eventToMessage fails (Line 49), the method returns early without calling RecordError. The subscriber records "conversion" errors for the analogous path. For consistency and observability, consider recording this error type here as well.

Suggested fix
 	msg, err := eventToMessage(event)
 	if err != nil {
 		p.logger.Errorf(ctx, "Failed to convert CloudEvent to message: %v", err)
+		p.metrics.RecordError(topic, "conversion")
 		return err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@broker/publisher.go` around lines 48 - 52, The publish path currently returns
immediately when eventToMessage fails (observed at the eventToMessage call and
p.logger.Errorf), so add a metrics error record before returning: call
p.metrics.RecordError(ctx, "conversion") (same error label used by the
subscriber) right after logging the error and before the return in the Publish
(or corresponding) method where eventToMessage is invoked; ensure you import/use
the same metrics instance p.metrics and preserve the existing log message.
🧹 Nitpick comments (3)
broker/metrics.go (1)

59-64: MustRegister will panic on duplicate registration — risky for a library.

Since this is a library consumed by other services, calling NewMetricsRecorder twice with the same (or default) registerer will panic. Consider using prometheus.Register and handling the AlreadyRegisteredError gracefully, or documenting clearly that callers must create only one MetricsRecorder per registerer.

Suggested safer alternative
-	registerer.MustRegister(
-		m.messagesConsumed,
-		m.messagesPublished,
-		m.errors,
-		m.messageDuration,
-	)
-
-	return m
+	collectors := []prometheus.Collector{
+		m.messagesConsumed,
+		m.messagesPublished,
+		m.errors,
+		m.messageDuration,
+	}
+	for _, c := range collectors {
+		if err := registerer.Register(c); err != nil {
+			// If already registered, that's acceptable — reuse existing.
+			var are prometheus.AlreadyRegisteredError
+			if !errors.As(err, &are) {
+				panic(fmt.Sprintf("failed to register metric: %v", err))
+			}
+		}
+	}
+
+	return m

(This requires adding "errors" and "fmt" to the import block.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@broker/metrics.go` around lines 59 - 64, The current use of
registerer.MustRegister in NewMetricsRecorder will panic on duplicate
registration; change to registering each metric with registerer.Register (or
prometheus.Register) and handle prometheus.AlreadyRegisteredError: for each
metric in m (messagesConsumed, messagesPublished, errors, messageDuration) call
Register and if you get an AlreadyRegisteredError, retrieve and reuse the
existing collector (use the error's ExistingCollector) or simply ignore the
error so the function doesn't panic; ensure NewMetricsRecorder documents that it
will reuse existing collectors when duplicates are encountered.
broker/subscriber.go (1)

185-188: SetMetrics is not thread-safe with concurrent Subscribe calls.

If SetMetrics is called while a Subscribe handler goroutine is running, there's a data race on s.metrics. This is low risk if the intended usage is to call SetMetrics once during initialization before any Subscribe calls, but it's worth documenting this expectation in the method's godoc.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@broker/subscriber.go` around lines 185 - 188, The SetMetrics method writes
s.metrics without synchronization, causing a potential data race if called
concurrently with Subscribe goroutines; update the SetMetrics godoc on
subscriber.SetMetrics to state explicitly that it is not thread-safe and must be
called during initialization (before any calls to Subscribe) or, alternatively,
if you want runtime safety, protect access to s.metrics with a mutex used by
both SetMetrics and Subscribe (referencing the subscriber type, SetMetrics,
Subscribe and field s.metrics). Ensure the godoc clearly documents the expected
usage (call once at startup) if you choose the documentation-only approach.
broker/metrics_test.go (1)

233-246: Consider adding a subscriber handler integration test.

TestSetMetricsOnSubscriber only verifies that SetMetrics assigns the field. Unlike the publisher tests (which exercise Publish end-to-end and assert counter values), there's no test that processes a message through the subscriber handler and verifies RecordConsumed, RecordDuration, or RecordError are actually called. This would strengthen confidence in the subscriber instrumentation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@broker/metrics_test.go` around lines 233 - 246, Add an integration test that
wires a subscriber instance with a real MetricsRecorder (use NewMetricsRecorder
with a prometheus.Registry) and sends a message through the subscriber message
handler method (the subscriber struct created in TestSetMetricsOnSubscriber and
its message handling entrypoint, e.g., the subscriber's Handle/handle/handler
function) to exercise successful and error paths; for a success case assert the
metrics registry shows RecordConsumed and RecordDuration updated, and for an
error case simulate the handler returning an error and assert RecordError and
the error-related metrics changed. Create the test similarly to existing
publisher end-to-end tests: set s.SetMetrics(metrics), invoke the handler with a
mock message/context, wait for processing, then query the prometheus.Registry
(or testutil) to verify the counters/histogram values for RecordConsumed,
RecordDuration and RecordError are as expected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@broker/publisher.go`:
- Around line 48-52: The publish path currently returns immediately when
eventToMessage fails (observed at the eventToMessage call and p.logger.Errorf),
so add a metrics error record before returning: call p.metrics.RecordError(ctx,
"conversion") (same error label used by the subscriber) right after logging the
error and before the return in the Publish (or corresponding) method where
eventToMessage is invoked; ensure you import/use the same metrics instance
p.metrics and preserve the existing log message.

---

Nitpick comments:
In `@broker/metrics_test.go`:
- Around line 233-246: Add an integration test that wires a subscriber instance
with a real MetricsRecorder (use NewMetricsRecorder with a prometheus.Registry)
and sends a message through the subscriber message handler method (the
subscriber struct created in TestSetMetricsOnSubscriber and its message handling
entrypoint, e.g., the subscriber's Handle/handle/handler function) to exercise
successful and error paths; for a success case assert the metrics registry shows
RecordConsumed and RecordDuration updated, and for an error case simulate the
handler returning an error and assert RecordError and the error-related metrics
changed. Create the test similarly to existing publisher end-to-end tests: set
s.SetMetrics(metrics), invoke the handler with a mock message/context, wait for
processing, then query the prometheus.Registry (or testutil) to verify the
counters/histogram values for RecordConsumed, RecordDuration and RecordError are
as expected.

In `@broker/metrics.go`:
- Around line 59-64: The current use of registerer.MustRegister in
NewMetricsRecorder will panic on duplicate registration; change to registering
each metric with registerer.Register (or prometheus.Register) and handle
prometheus.AlreadyRegisteredError: for each metric in m (messagesConsumed,
messagesPublished, errors, messageDuration) call Register and if you get an
AlreadyRegisteredError, retrieve and reuse the existing collector (use the
error's ExistingCollector) or simply ignore the error so the function doesn't
panic; ensure NewMetricsRecorder documents that it will reuse existing
collectors when duplicates are encountered.

In `@broker/subscriber.go`:
- Around line 185-188: The SetMetrics method writes s.metrics without
synchronization, causing a potential data race if called concurrently with
Subscribe goroutines; update the SetMetrics godoc on subscriber.SetMetrics to
state explicitly that it is not thread-safe and must be called during
initialization (before any calls to Subscribe) or, alternatively, if you want
runtime safety, protect access to s.metrics with a mutex used by both SetMetrics
and Subscribe (referencing the subscriber type, SetMetrics, Subscribe and field
s.metrics). Ensure the godoc clearly documents the expected usage (call once at
startup) if you choose the documentation-only approach.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 024ec6b and 8183484.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • broker/metrics.go
  • broker/metrics_test.go
  • broker/publisher.go
  • broker/subscriber.go
  • go.mod

messagesConsumed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "hyperfleet_broker_messages_consumed_total",
Help: "Total number of messages consumed from the broker.",
}, []string{"topic", "component", "version"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log the subscriberId as a label?
I guess that there should be a correspondence between the component and the subscriberId, so this may not be a strong requisite after all, and maybe is just duplicating information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — I think component + topic already gives enough granularity for observability. The subscriberId is more of a routing/infrastructure concern (queue naming in RabbitMQ, subscription identity in Pub/Sub) than a metric dimension. Adding it would also increase Prometheus cardinality and only apply to consumer metrics, creating an asymmetry with publisher metrics. If someone needs to distinguish multiple subscribers within the same component, they can use different component values when creating the MetricsRecorder.

@rh-amarin
Copy link
Contributor

Should we add this info about what is being monitored to the README?

@rafabene
Copy link
Contributor Author

Good call — added a "Prometheus Metrics" section to the README in 95cc221 covering usage, exported metrics, error types, and duration buckets.

Move MetricsRecorder from an optional SetMetrics() call to a required
constructor parameter on NewPublisher and NewSubscriber. This removes
SetMetrics from the public interfaces and ensures metrics are wired
from creation time. Passing nil disables instrumentation (all
RecordX methods are already nil-safe).

This is a breaking API change (new required parameter), so the release
should be v1.1.0 instead of v1.0.3.
@rafabene
Copy link
Contributor Author

/retest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants