Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
// Default: reserve at least half the workers for non-clone jobs.
maxClones = max(1, config.Concurrency/2)
}
m, err := newSchedulerMetrics()
if err != nil {
return nil, errors.Wrap(err, "create scheduler metrics")
}
m := newSchedulerMetrics()
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]string),
Expand Down
44 changes: 9 additions & 35 deletions internal/jobscheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package jobscheduler

import (
"github.com/alecthomas/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/block/cachew/internal/metrics"
)

type schedulerMetrics struct {
Expand All @@ -14,40 +15,13 @@ type schedulerMetrics struct {
jobDuration metric.Float64Histogram
}

func newSchedulerMetrics() (*schedulerMetrics, error) {
func newSchedulerMetrics() *schedulerMetrics {
meter := otel.Meter("cachew.scheduler")
m := &schedulerMetrics{}
var err error

if m.queueDepth, err = meter.Int64Gauge("cachew.scheduler.queue_depth",
metric.WithDescription("Number of jobs waiting in the scheduler queue"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create queue_depth gauge")
}

if m.activeWorkers, err = meter.Int64Gauge("cachew.scheduler.active_workers",
metric.WithDescription("Number of workers currently executing jobs"),
metric.WithUnit("{workers}")); err != nil {
return nil, errors.Wrap(err, "create active_workers gauge")
}

if m.activeClones, err = meter.Int64Gauge("cachew.scheduler.active_clones",
metric.WithDescription("Number of clone jobs currently executing"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create active_clones gauge")
return &schedulerMetrics{
queueDepth: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.queue_depth", "{jobs}", "Number of jobs waiting in the scheduler queue"),
activeWorkers: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.active_workers", "{workers}", "Number of workers currently executing jobs"),
activeClones: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.active_clones", "{jobs}", "Number of clone jobs currently executing"),
jobsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.scheduler.jobs_total", "{jobs}", "Total number of completed scheduler jobs"),
jobDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.scheduler.job_duration", "{ms}", "Histogram of job durations"),
}

if m.jobsTotal, err = meter.Int64Counter("cachew.scheduler.jobs_total",
metric.WithDescription("Total number of completed scheduler jobs"),
metric.WithUnit("{jobs}")); err != nil {
return nil, errors.Wrap(err, "create jobs_total counter")
}

if m.jobDuration, err = meter.Float64Histogram("cachew.scheduler.job_duration_seconds",
metric.WithDescription("Duration of scheduler jobs in seconds"),
metric.WithUnit("s")); err != nil {
return nil, errors.Wrap(err, "create job_duration histogram")
}

return m, nil
}
42 changes: 42 additions & 0 deletions internal/metrics/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package metrics

import (
"reflect"

"go.opentelemetry.io/otel/metric"
)

func NewMetric[OM any](meter metric.Meter, path, unit, description string) OM {
u := metric.WithUnit(unit)
d := metric.WithDescription(description)
switch reflect.TypeFor[OM]().Name() {
case "Int64Gauge":
om, err := meter.Int64Gauge(path, u, d)
if err != nil {
panic(err)
}
return om.(OM)

case "Int64Counter":
om, err := meter.Int64Counter(path, u, d)
if err != nil {
panic(err)
}
return om.(OM)

case "Float64Histogram":
om, err := meter.Float64Histogram(path, u, d)
if err != nil {
panic(err)
}
return om.(OM)

case "Float64Counter":
om, err := meter.Float64Counter(path, u, d)
if err != nil {
panic(err)
}
return om.(OM)
}
panic("unsupported metric type")
}
17 changes: 17 additions & 0 deletions internal/metrics/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package metrics_test

import (
"testing"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/block/cachew/internal/metrics"
)

func TestNewMetric(t *testing.T) {
meter := otel.Meter("cachew.scheduler")
queueDepth := metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.queue_depth", "{jobs}",
"Number of jobs waiting in the scheduler queue")
queueDepth.Record(t.Context(), 10)
}
5 changes: 1 addition & 4 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func New(
return nil, errors.Wrap(err, "failed to create scheduler")
}

m, err := newGitMetrics()
if err != nil {
return nil, errors.Wrap(err, "create git metrics")
}
m := newGitMetrics()

s := &Strategy{
config: config,
Expand Down
30 changes: 7 additions & 23 deletions internal/strategy/git/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"time"

"github.com/alecthomas/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/block/cachew/internal/metrics"
)

type gitMetrics struct {
Expand All @@ -16,30 +17,13 @@ type gitMetrics struct {
requestTotal metric.Int64Counter
}

func newGitMetrics() (*gitMetrics, error) {
func newGitMetrics() *gitMetrics {
meter := otel.Meter("cachew.git")
m := &gitMetrics{}
var err error

if m.operationDuration, err = meter.Float64Histogram("cachew.git.operation_duration_seconds",
metric.WithDescription("Duration of git operations (clone, fetch, repack, snapshot)"),
metric.WithUnit("s")); err != nil {
return nil, errors.Wrap(err, "create operation_duration histogram")
}

if m.operationTotal, err = meter.Int64Counter("cachew.git.operations_total",
metric.WithDescription("Total number of git operations"),
metric.WithUnit("{operations}")); err != nil {
return nil, errors.Wrap(err, "create operations_total counter")
return &gitMetrics{
operationDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)"),
operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"),
requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"),
}

if m.requestTotal, err = meter.Int64Counter("cachew.git.requests_total",
metric.WithDescription("Total number of git HTTP requests by type"),
metric.WithUnit("{requests}")); err != nil {
return nil, errors.Wrap(err, "create requests_total counter")
}

return m, nil
}

// recordOperation records the duration and outcome of a git operation (clone, fetch, repack, snapshot).
Expand Down