HYPERFLEET-469: Add Prometheus metrics with hyperfleet_broker_ prefix#14
HYPERFLEET-469: Add Prometheus metrics with hyperfleet_broker_ prefix#14rafabene wants to merge 4 commits intoopenshift-hyperfleet:mainfrom
Conversation
Add MetricsRecorder with four metrics per the HyperFleet Metrics Standard: - hyperfleet_broker_messages_consumed_total (counter) - hyperfleet_broker_messages_published_total (counter) - hyperfleet_broker_errors_total (counter, with error_type label) - hyperfleet_broker_message_duration_seconds (histogram) Metrics are optional via SetMetrics() on Publisher/Subscriber interfaces, maintaining full backwards compatibility.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (14)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughAdds Prometheus instrumentation to the broker package via a new exported MetricsRecorder type and constructor NewMetricsRecorder(component, version, registerer). Publisher and Subscriber concrete types gain an optional metrics field (nullable); NewPublisher and NewSubscriber signatures now accept a *MetricsRecorder parameter and set it on created objects. Publish and Subscribe flows record consumed/published counters, error counters (conversion, publish, handler), and handler duration histograms. Tests, README, and go.mod updated to cover usage and dependencies. Sequence Diagram(s)sequenceDiagram
participant Code
participant MetricsRecorder
participant Prometheus as Prometheus Registry
Code->>MetricsRecorder: NewMetricsRecorder(component, version, registerer)
activate MetricsRecorder
MetricsRecorder->>MetricsRecorder: Create CounterVec (messages_consumed)
MetricsRecorder->>Prometheus: Register messages_consumed
MetricsRecorder->>MetricsRecorder: Create CounterVec (messages_published)
MetricsRecorder->>Prometheus: Register messages_published
MetricsRecorder->>MetricsRecorder: Create CounterVec (errors)
MetricsRecorder->>Prometheus: Register errors
MetricsRecorder->>MetricsRecorder: Create HistogramVec (message_duration)
MetricsRecorder->>Prometheus: Register message_duration
deactivate MetricsRecorder
MetricsRecorder-->>Code: *MetricsRecorder
sequenceDiagram
participant Client
participant Publisher
participant MetricsRecorder
participant Watermill as Watermill Publisher
Client->>Publisher: SetMetrics(recorder)
Publisher->>Publisher: store metrics
Client->>Publisher: Publish(topic, msg)
activate Publisher
Publisher->>Watermill: Publish message
alt success
Watermill-->>Publisher: OK
Publisher->>MetricsRecorder: RecordPublished(topic)
else failure
Watermill-->>Publisher: Error
Publisher->>MetricsRecorder: RecordError(topic, "publish")
end
deactivate Publisher
sequenceDiagram
participant MessageSource
participant Subscriber
participant MetricsRecorder
participant Handler
MessageSource->>Subscriber: Message arrives
activate Subscriber
Subscriber->>MetricsRecorder: RecordConsumed(topic)
Subscriber->>Subscriber: convert/parse message
alt convert success
Subscriber->>Handler: handle message
activate Handler
Handler-->>Subscriber: result / error
deactivate Handler
else convert fail
Subscriber->>MetricsRecorder: RecordError(topic, "conversion")
end
Subscriber->>MetricsRecorder: RecordDuration(topic, duration)
alt handler error
Subscriber->>MetricsRecorder: RecordError(topic, "handler")
end
deactivate Subscriber
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
broker/publisher.go (1)
48-52:⚠️ Potential issue | 🟡 MinorConversion errors during publish are not recorded in metrics.
When
eventToMessagefails (Line 49), the method returns early without callingRecordError. The subscriber records"conversion"errors for the analogous path. For consistency and observability, consider recording this error type here as well.Suggested fix
msg, err := eventToMessage(event) if err != nil { p.logger.Errorf(ctx, "Failed to convert CloudEvent to message: %v", err) + p.metrics.RecordError(topic, "conversion") return err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@broker/publisher.go` around lines 48 - 52, The publish path currently returns immediately when eventToMessage fails (observed at the eventToMessage call and p.logger.Errorf), so add a metrics error record before returning: call p.metrics.RecordError(ctx, "conversion") (same error label used by the subscriber) right after logging the error and before the return in the Publish (or corresponding) method where eventToMessage is invoked; ensure you import/use the same metrics instance p.metrics and preserve the existing log message.
🧹 Nitpick comments (3)
broker/metrics.go (1)
59-64:MustRegisterwill panic on duplicate registration — risky for a library.Since this is a library consumed by other services, calling
NewMetricsRecordertwice with the same (or default) registerer will panic. Consider usingprometheus.Registerand handling theAlreadyRegisteredErrorgracefully, or documenting clearly that callers must create only oneMetricsRecorderper registerer.Suggested safer alternative
- registerer.MustRegister( - m.messagesConsumed, - m.messagesPublished, - m.errors, - m.messageDuration, - ) - - return m + collectors := []prometheus.Collector{ + m.messagesConsumed, + m.messagesPublished, + m.errors, + m.messageDuration, + } + for _, c := range collectors { + if err := registerer.Register(c); err != nil { + // If already registered, that's acceptable — reuse existing. + var are prometheus.AlreadyRegisteredError + if !errors.As(err, &are) { + panic(fmt.Sprintf("failed to register metric: %v", err)) + } + } + } + + return m(This requires adding
"errors"and"fmt"to the import block.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@broker/metrics.go` around lines 59 - 64, The current use of registerer.MustRegister in NewMetricsRecorder will panic on duplicate registration; change to registering each metric with registerer.Register (or prometheus.Register) and handle prometheus.AlreadyRegisteredError: for each metric in m (messagesConsumed, messagesPublished, errors, messageDuration) call Register and if you get an AlreadyRegisteredError, retrieve and reuse the existing collector (use the error's ExistingCollector) or simply ignore the error so the function doesn't panic; ensure NewMetricsRecorder documents that it will reuse existing collectors when duplicates are encountered.broker/subscriber.go (1)
185-188:SetMetricsis not thread-safe with concurrentSubscribecalls.If
SetMetricsis called while aSubscribehandler goroutine is running, there's a data race ons.metrics. This is low risk if the intended usage is to callSetMetricsonce during initialization before anySubscribecalls, but it's worth documenting this expectation in the method's godoc.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@broker/subscriber.go` around lines 185 - 188, The SetMetrics method writes s.metrics without synchronization, causing a potential data race if called concurrently with Subscribe goroutines; update the SetMetrics godoc on subscriber.SetMetrics to state explicitly that it is not thread-safe and must be called during initialization (before any calls to Subscribe) or, alternatively, if you want runtime safety, protect access to s.metrics with a mutex used by both SetMetrics and Subscribe (referencing the subscriber type, SetMetrics, Subscribe and field s.metrics). Ensure the godoc clearly documents the expected usage (call once at startup) if you choose the documentation-only approach.broker/metrics_test.go (1)
233-246: Consider adding a subscriber handler integration test.
TestSetMetricsOnSubscriberonly verifies thatSetMetricsassigns the field. Unlike the publisher tests (which exercisePublishend-to-end and assert counter values), there's no test that processes a message through the subscriber handler and verifiesRecordConsumed,RecordDuration, orRecordErrorare actually called. This would strengthen confidence in the subscriber instrumentation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@broker/metrics_test.go` around lines 233 - 246, Add an integration test that wires a subscriber instance with a real MetricsRecorder (use NewMetricsRecorder with a prometheus.Registry) and sends a message through the subscriber message handler method (the subscriber struct created in TestSetMetricsOnSubscriber and its message handling entrypoint, e.g., the subscriber's Handle/handle/handler function) to exercise successful and error paths; for a success case assert the metrics registry shows RecordConsumed and RecordDuration updated, and for an error case simulate the handler returning an error and assert RecordError and the error-related metrics changed. Create the test similarly to existing publisher end-to-end tests: set s.SetMetrics(metrics), invoke the handler with a mock message/context, wait for processing, then query the prometheus.Registry (or testutil) to verify the counters/histogram values for RecordConsumed, RecordDuration and RecordError are as expected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@broker/publisher.go`:
- Around line 48-52: The publish path currently returns immediately when
eventToMessage fails (observed at the eventToMessage call and p.logger.Errorf),
so add a metrics error record before returning: call p.metrics.RecordError(ctx,
"conversion") (same error label used by the subscriber) right after logging the
error and before the return in the Publish (or corresponding) method where
eventToMessage is invoked; ensure you import/use the same metrics instance
p.metrics and preserve the existing log message.
---
Nitpick comments:
In `@broker/metrics_test.go`:
- Around line 233-246: Add an integration test that wires a subscriber instance
with a real MetricsRecorder (use NewMetricsRecorder with a prometheus.Registry)
and sends a message through the subscriber message handler method (the
subscriber struct created in TestSetMetricsOnSubscriber and its message handling
entrypoint, e.g., the subscriber's Handle/handle/handler function) to exercise
successful and error paths; for a success case assert the metrics registry shows
RecordConsumed and RecordDuration updated, and for an error case simulate the
handler returning an error and assert RecordError and the error-related metrics
changed. Create the test similarly to existing publisher end-to-end tests: set
s.SetMetrics(metrics), invoke the handler with a mock message/context, wait for
processing, then query the prometheus.Registry (or testutil) to verify the
counters/histogram values for RecordConsumed, RecordDuration and RecordError are
as expected.
In `@broker/metrics.go`:
- Around line 59-64: The current use of registerer.MustRegister in
NewMetricsRecorder will panic on duplicate registration; change to registering
each metric with registerer.Register (or prometheus.Register) and handle
prometheus.AlreadyRegisteredError: for each metric in m (messagesConsumed,
messagesPublished, errors, messageDuration) call Register and if you get an
AlreadyRegisteredError, retrieve and reuse the existing collector (use the
error's ExistingCollector) or simply ignore the error so the function doesn't
panic; ensure NewMetricsRecorder documents that it will reuse existing
collectors when duplicates are encountered.
In `@broker/subscriber.go`:
- Around line 185-188: The SetMetrics method writes s.metrics without
synchronization, causing a potential data race if called concurrently with
Subscribe goroutines; update the SetMetrics godoc on subscriber.SetMetrics to
state explicitly that it is not thread-safe and must be called during
initialization (before any calls to Subscribe) or, alternatively, if you want
runtime safety, protect access to s.metrics with a mutex used by both SetMetrics
and Subscribe (referencing the subscriber type, SetMetrics, Subscribe and field
s.metrics). Ensure the godoc clearly documents the expected usage (call once at
startup) if you choose the documentation-only approach.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (5)
broker/metrics.gobroker/metrics_test.gobroker/publisher.gobroker/subscriber.gogo.mod
| messagesConsumed: prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Name: "hyperfleet_broker_messages_consumed_total", | ||
| Help: "Total number of messages consumed from the broker.", | ||
| }, []string{"topic", "component", "version"}), |
There was a problem hiding this comment.
Should we log the subscriberId as a label?
I guess that there should be a correspondence between the component and the subscriberId, so this may not be a strong requisite after all, and maybe is just duplicating information
There was a problem hiding this comment.
Agreed — I think component + topic already gives enough granularity for observability. The subscriberId is more of a routing/infrastructure concern (queue naming in RabbitMQ, subscription identity in Pub/Sub) than a metric dimension. Adding it would also increase Prometheus cardinality and only apply to consumer metrics, creating an asymmetry with publisher metrics. If someone needs to distinguish multiple subscribers within the same component, they can use different component values when creating the MetricsRecorder.
…tMetrics thread-safety
|
Should we add this info about what is being monitored to the README? |
|
Good call — added a "Prometheus Metrics" section to the README in 95cc221 covering usage, exported metrics, error types, and duration buckets. |
Move MetricsRecorder from an optional SetMetrics() call to a required constructor parameter on NewPublisher and NewSubscriber. This removes SetMetrics from the public interfaces and ensures metrics are wired from creation time. Passing nil disables instrumentation (all RecordX methods are already nil-safe). This is a breaking API change (new required parameter), so the release should be v1.1.0 instead of v1.0.3.
|
/retest |
Summary
MetricsRecorderwith four Prometheus metrics following the HyperFleet Metrics Standard (hyperfleet_broker_prefix)Publisher.Publish()with published counter and error counterSubscriber.Subscribe()handler with consumed counter, duration histogram, and error counters (handler/conversion)SetMetrics()on bothPublisherandSubscriberinterfaces, maintaining full backwards compatibilityMetrics
hyperfleet_broker_messages_consumed_totaltopic,component,versionhyperfleet_broker_messages_published_totaltopic,component,versionhyperfleet_broker_errors_totaltopic,error_type,component,versionhyperfleet_broker_message_duration_secondstopic,component,versionTest plan
SetMetricstests for both Publisher and Subscribermake testpassesmake lintpasses (0 issues)Jira: https://issues.redhat.com/browse/HYPERFLEET-469
Summary by CodeRabbit
New Features
Tests
Documentation