Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion drpcclient/dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func DialContext(ctx context.Context, address string, opts ...DialOption) (conn
}
}

collectMetrics := true
if options.metrics == nil {
collectMetrics = false
options.metrics = &drpcmetrics.ClientMetrics{}
}
return drpcconn.NewWithOptions(netConn, drpcconn.Options{
Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{
Expand All @@ -144,6 +149,7 @@ func DialContext(ctx context.Context, address string, opts ...DialOption) (conn
},
SoftCancel: false,
},
Metrics: options.metrics,
CollectMetrics: collectMetrics,
Metrics: *options.metrics,
}), nil
}
22 changes: 13 additions & 9 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ type Options struct {
// Manager controls the options we pass to the manager of this conn.
Manager drpcmanager.Options

// TODO: (server): deprecate this
// CollectStats controls whether the client should collect stats on the
// rpcs it creates.
CollectStats bool

// Metrics holds optional metrics the client will populate. If nil, no
// metrics are recorded.
Metrics *drpcmetrics.ClientMetrics
// CollectMetrics controls whether the client should collect metrics.
CollectMetrics bool

// Metrics holds optional metrics the client will populate.
Metrics drpcmetrics.ClientMetrics
}

// Conn is a drpc client connection.
Expand All @@ -41,7 +44,7 @@ type Conn struct {
mu sync.Mutex
wbuf []byte

stats map[string]*drpcstats.Stats
stats map[string]*drpcstats.Stats // TODO (server): deprecate
}

var _ drpc.Conn = (*Conn)(nil)
Expand All @@ -56,18 +59,19 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
tr: tr,
}

if opts.Metrics != nil {
mt := &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: opts.Metrics.BytesSent, BytesRecv: opts.Metrics.BytesRecv}
tr = mt
c.tr = tr
if opts.CollectMetrics {
mt := drpcmetrics.ToMeteredTransport(tr, opts.Metrics.BytesSent,
opts.Metrics.BytesRecv)
c.tr = mt
}

// TODO: (server): deprecate
if opts.CollectStats {
drpcopts.SetManagerStatsCB(&opts.Manager.Internal, c.getStats)
c.stats = make(map[string]*drpcstats.Stats)
}

c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
c.man = drpcmanager.NewWithOptions(c.tr, opts.Manager)

return c
}
Expand Down
73 changes: 48 additions & 25 deletions drpcmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,37 @@ import (
)

// Counter is a metric that can only be incremented (monotonically increasing).
// The labels parameter contains key-value pairs for metric dimensions
// (e.g. rpcService, rpcMethod). It may be nil when no
// dimensional context is available.
// The concrete type *must* provide a thread-safe implementation for these
// methods.
// The concrete type must provide a thread-safe implementation for the method.
type Counter interface {
Inc(labels map[string]string, v int64)
Inc(v int64)
}

// NoOpCounter is a Counter implementation that does nothing.
type NoOpCounter struct{}

// Inc implements Counter.
func (NoOpCounter) Inc(labels map[string]string, v int64) {}
func (NoOpCounter) Inc(v int64) {}

// LabeledCounter is a Counter that accepts dimensional labels on each
// increment. The labels parameter contains key-value pairs for metric
// dimensions. It may be nil when no dimensional context is available.
// The concrete type must provide a thread-safe implementation.
type LabeledCounter interface {
Inc(labels map[string]string, v int64)
}

// Gauge is a metric that can increase and decrease (e.g. pool size,
// active count). Update sets the gauge to the given absolute value.
// NoOpLabeledCounter is a LabeledCounter implementation that does nothing.
type NoOpLabeledCounter struct{}

// Inc implements LabeledCounter.
func (NoOpLabeledCounter) Inc(labels map[string]string, v int64) {}

// Gauge is a metric that can increase and decrease (e.g. pool size).
// Update sets the gauge to the given absolute value.
//
// Note: Gauge values may go up or down; Counter values must only increase.
// The concrete type *must* provide a thread-safe implementation for these
// methods.
// The concrete type must provide a thread-safe implementation for the
// method.
type Gauge interface {
Update(labels map[string]string, v int64)
}
Expand All @@ -42,30 +52,43 @@ type NoOpGauge struct{}
// Update implements Gauge.
func (NoOpGauge) Update(labels map[string]string, v int64) {}

// TODO (sujatha): Plug-in no-op implementation for nil metrics

// MeteredTransport wraps a Transport and increments byte counters on each
// meteredTransport wraps a Transport and increments byte counters on each
// Read and Write call.
type MeteredTransport struct {
type meteredTransport struct {
drpc.Transport
BytesSent Counter
BytesRecv Counter
bytesSent Counter
bytesRecv Counter
}

// ToMeteredTransport returns a transport that increments bytesSent and
// bytesRecv on each Write and Read call respectively. Nil counters are
// replaced with no-op implementations.
func ToMeteredTransport(tr drpc.Transport, bytesSent,
bytesRecv Counter) drpc.Transport {
if bytesSent == nil {
bytesSent = NoOpCounter{}
}
if bytesRecv == nil {
bytesRecv = NoOpCounter{}
}
return &meteredTransport{Transport: tr, bytesSent: bytesSent,
bytesRecv: bytesRecv}
}

// Read reads from the underlying transport and increments BytesRecv.
func (t *MeteredTransport) Read(p []byte) (n int, err error) {
// Read reads from the underlying transport and increments bytesRecv.
func (t *meteredTransport) Read(p []byte) (n int, err error) {
n, err = t.Transport.Read(p)
if n > 0 && t.BytesRecv != nil {
t.BytesRecv.Inc(nil, int64(n))
if n > 0 {
t.bytesRecv.Inc(int64(n))
}
return n, err
}

// Write writes to the underlying transport and increments BytesSent.
func (t *MeteredTransport) Write(p []byte) (n int, err error) {
// Write writes to the underlying transport and increments bytesSent.
func (t *meteredTransport) Write(p []byte) (n int, err error) {
n, err = t.Transport.Write(p)
if n > 0 && t.BytesSent != nil {
t.BytesSent.Inc(nil, int64(n))
if n > 0 {
t.bytesSent.Inc(int64(n))
}
return n, err
}
Expand Down
51 changes: 27 additions & 24 deletions drpcpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
// PoolMetrics holds optional metrics for connection pool monitoring.
type PoolMetrics struct {
PoolSize drpcmetrics.Gauge
ConnectionHitsTotal drpcmetrics.Counter
ConnectionMissesTotal drpcmetrics.Counter
ConnectionHitsTotal drpcmetrics.LabeledCounter
ConnectionMissesTotal drpcmetrics.LabeledCounter
}

// Options contains the options to configure a pool.
Expand All @@ -36,9 +36,8 @@ type Options struct {
// no values for any single key.
KeyCapacity int

// Metrics holds optional metrics the pool will populate. If nil,
// no metrics are recorded.
Metrics *PoolMetrics
// Metrics holds optional metrics the pool will populate.
Metrics PoolMetrics

// Labels holds optional labels to be attached to all metrics.
Labels map[string]string
Expand All @@ -61,36 +60,39 @@ func New[K comparable, V Conn](opts Options) *Pool[K, V] {
opts: opts,
entries: make(map[K]*list[K, V]),
}

pool.initPoolMetrics()

// emit the metric (0 value) so it shows up as soon as the pool is created
pool.updatePoolSize()
return &pool
}

func (p *Pool[K, V]) recordHit() {
if p.opts.Metrics == nil {
return
// initPoolMetrics copies the caller-supplied metrics into the pool,
// substituting no-op implementations for any nil fields.
func (p *Pool[K, V]) initPoolMetrics() {
metrics := &p.opts.Metrics
if metrics.PoolSize == nil {
metrics.PoolSize = drpcmetrics.NoOpGauge{}
}
if p.opts.Metrics.ConnectionHitsTotal != nil {
p.opts.Metrics.ConnectionHitsTotal.Inc(p.opts.Labels, 1)
if metrics.ConnectionHitsTotal == nil {
metrics.ConnectionHitsTotal = drpcmetrics.NoOpLabeledCounter{}
}
if metrics.ConnectionMissesTotal == nil {
metrics.ConnectionMissesTotal = drpcmetrics.NoOpLabeledCounter{}
}
}

func (p *Pool[K, V]) recordHit() {
p.opts.Metrics.ConnectionHitsTotal.Inc(p.opts.Labels, 1)
}

func (p *Pool[K, V]) recordMiss() {
if p.opts.Metrics == nil {
return
}
if p.opts.Metrics.ConnectionMissesTotal != nil {
p.opts.Metrics.ConnectionMissesTotal.Inc(p.opts.Labels, 1)
}
p.opts.Metrics.ConnectionMissesTotal.Inc(p.opts.Labels, 1)
}

func (p *Pool[K, V]) updatePoolSize() {
if p.opts.Metrics == nil {
return
}
if p.opts.Metrics.PoolSize != nil {
p.opts.Metrics.PoolSize.Update(p.opts.Labels, int64(p.order.count))
}
p.opts.Metrics.PoolSize.Update(p.opts.Labels, int64(p.order.count))
}

func (p *Pool[K, V]) log(what string, cb func() string) {
Expand Down Expand Up @@ -120,8 +122,9 @@ func (p *Pool[K, V]) Close() (err error) {
// Get returns a new Conn that will use the provided dial function to create an
// underlying conn to be cached by the Pool when Conn methods are invoked. It will
// share any cached connections with other conns that use the same key.
func (p *Pool[K, V]) Get(ctx context.Context, key K,
dial func(ctx context.Context, key K) (V, error)) Conn {
func (p *Pool[K, V]) Get(
ctx context.Context, key K, dial func(ctx context.Context, key K) (V, error),
) Conn {
return &poolConn[K, V]{
key: key,
pool: p,
Expand Down
6 changes: 3 additions & 3 deletions drpcpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func TestPoolMetrics_PutTakeClose(t *testing.T) {

pool := New[string, Conn](Options{
Capacity: 10,
Metrics: &PoolMetrics{
Metrics: PoolMetrics{
PoolSize: poolSize,
ConnectionHitsTotal: hits,
ConnectionMissesTotal: misses,
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestPoolMetrics_Eviction(t *testing.T) {
pool := New[string, Conn](Options{
Capacity: 1,
KeyCapacity: 1,
Metrics: &PoolMetrics{
Metrics: PoolMetrics{
PoolSize: poolSize,
ConnectionMissesTotal: misses,
},
Expand Down Expand Up @@ -529,7 +529,7 @@ func TestPoolMetrics_Eviction(t *testing.T) {
func TestPoolMetrics_NilFields(t *testing.T) {
// All PoolMetrics fields are nil — should not panic.
pool := New[string, Conn](Options{
Metrics: &PoolMetrics{},
Metrics: PoolMetrics{},
})

conn := &callbackConn{
Expand Down
35 changes: 11 additions & 24 deletions drpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,21 @@ type Options struct {
// restrictions. If it returns a non-nil error the connection is rejected.
TLSCipherRestrict func(conn net.Conn) error

// Metrics holds optional metrics the server will populate. If nil, no
// metrics are recorded.
Metrics *ServerMetrics
// Metrics holds optional metrics the server will populate.
Metrics ServerMetrics
}

// ServerMetrics holds optional metrics that the server will populate during
// operation.
// Metrics are defined and registered by the caller (e.g. in CockroachDB) and
// passed in; this package never imports a metrics library.
type ServerMetrics struct {
BytesSent drpcmetrics.Counter
BytesRecv drpcmetrics.Counter
TLSHandshakeErrors drpcmetrics.Counter
}

// addTLSHandshakeError increments the TLS handshake error counter.
func (m *ServerMetrics) addTLSHandshakeError() {
if m != nil && m.TLSHandshakeErrors != nil {
m.TLSHandshakeErrors.Inc(nil, 1)
}
}

// toMeteredTransport wraps tr with byte counters.
func (m *ServerMetrics) toMeteredTransport(tr drpc.Transport) drpc.Transport {
if m == nil {
return tr
}
return &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: m.BytesSent, BytesRecv: m.BytesRecv}
// recordTLSHandshakeError increments the TLS handshake error counter.
func (s *Server) recordTLSHandshakeError() {
s.opts.Metrics.TLSHandshakeErrors.Inc(1)
}

// Server is an implementation of drpc.Server to serve drpc connections.
Expand Down Expand Up @@ -103,12 +90,14 @@ func NewWithOptions(handler drpc.Handler, opts Options) *Server {
opts: opts,
handler: handler,
}

if s.opts.CollectStats {
// TODO: (server): deprecate stats
drpcopts.SetManagerStatsCB(&s.opts.Manager.Internal, s.getStats)
s.stats = make(map[string]*drpcstats.Stats)
}

if s.opts.Metrics.TLSHandshakeErrors == nil {
s.opts.Metrics.TLSHandshakeErrors = drpcmetrics.NoOpCounter{}
}
return s
}

Expand Down Expand Up @@ -156,12 +145,12 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
// anyway.
err := tlsConn.HandshakeContext(ctx)
if err != nil {
s.opts.Metrics.addTLSHandshakeError()
s.recordTLSHandshakeError()
return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err)
}
if s.opts.TLSCipherRestrict != nil {
if err := s.opts.TLSCipherRestrict(tlsConn); err != nil {
s.opts.Metrics.addTLSHandshakeError()
s.recordTLSHandshakeError()
return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err)
}
}
Expand All @@ -172,8 +161,6 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
}
}

tr = s.opts.Metrics.toMeteredTransport(tr)

man := drpcmanager.NewWithOptions(tr, s.opts.Manager)
defer func() { err = errs.Combine(err, man.Close()) }()

Expand Down
Loading
Loading