Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,28 +328,31 @@ func runReceive(
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
Relabeller: relabeller,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
SplitTenantLabelName: conf.splitTenantLabelName,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
TenantAttributor: tenantAttributor,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
Relabeller: relabeller,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
SplitTenantLabelName: conf.splitTenantLabelName,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
TenantAttributor: tenantAttributor,
PoolingDisabled: !conf.poolingEnabled,
MaxPooledCompressedCap: conf.maxPooledCompressedCap,
MaxPooledDecompressedCap: conf.maxPooledDecompressedCap,
})

{
Expand Down Expand Up @@ -1039,6 +1042,11 @@ type receiveConfig struct {

tenantRulesConfig *extflag.PathOrContent
verifyTenantAttribution bool

// Pool configuration for receive-path buffer reuse.
poolingEnabled bool
maxPooledCompressedCap int
maxPooledDecompressedCap int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -1245,6 +1253,16 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.tenantRulesConfig = extflag.RegisterPathOrContent(cmd, "receive.tenant-rules", "YAML file that contains tenant attribution rules. Each rule maps label filters to a tenant ID. Rules are evaluated in order, first match wins.", extflag.WithEnvSubstitution())
cmd.Flag("receive.verify-tenant-attribution", "When enabled, tenant attribution rules are evaluated but only for verification. The HTTP header tenant is still used for actual routing/storage. Metrics are emitted to compare attributed vs HTTP tenant.").
Default("false").BoolVar(&rc.verifyTenantAttribution)

cmd.Flag("receive.pooling-enabled", "Enable pooling of buffers for receive-path request handling.").
Default("false").
BoolVar(&rc.poolingEnabled)
cmd.Flag("receive.max-pooled-compressed-cap", "Maximum capacity (bytes) of a compressed buffer that will be returned to the pool. Buffers larger than this are discarded to prevent pool ballooning.").
Default(fmt.Sprintf("%d", receive.DefaultMaxPooledCompressedCap)).
IntVar(&rc.maxPooledCompressedCap)
cmd.Flag("receive.max-pooled-decompressed-cap", "Maximum capacity (bytes) of a decompressed buffer that will be returned to the pool. Buffers larger than this are discarded to prevent pool ballooning.").
Default(fmt.Sprintf("%d", receive.DefaultMaxPooledDecompressedCap)).
IntVar(&rc.maxPooledDecompressedCap)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
186 changes: 87 additions & 99 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"bytes"
"cmp"
"context"
"crypto/tls"
"fmt"
Expand All @@ -22,7 +23,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/jpillora/backoff"
"github.com/klauspost/compress/s2"
"github.com/mwitkow/go-conntrack"
Expand All @@ -45,6 +45,7 @@ import (
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/syncutil"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/pool"
Expand All @@ -70,6 +71,15 @@ const (
labelSuccess = "success"
labelError = "error"
labelPreAgg = "__rollup__"

// DefaultMaxPooledCompressedCap is the maximum capacity of a compressed
// buffer that will be returned to the pool. Buffers that grew beyond this
// size are discarded to prevent pool ballooning.
DefaultMaxPooledCompressedCap = 1 << 20 // 1 MiB
// DefaultMaxPooledDecompressedCap is the maximum capacity of a
// decompressed buffer that will be returned to the pool. Buffers that
// grew beyond this size are discarded to prevent pool ballooning.
DefaultMaxPooledDecompressedCap = 4 << 20 // 4 MiB
)

type ReplicationProtocol string
Expand All @@ -90,64 +100,8 @@ var (

// Used internally to abort reads when the limiter is exceeded mid-stream.
errRequestTooLarge = errors.New("write request too large")

// Default / max capacities for pooled buffers. These caps prevent "pool ballooning"
// where a single large request permanently inflates process RSS.
defaultCompressedBufCap = 32 * 1024
defaultDecompressedBufCap = 128 * 1024
maxPooledCompressedCap = 1 << 20 // 1MB
maxPooledDecompressedCap = 4 << 20 // 4MB
copyBufSize = 32 * 1024

// Buffer/message pools to reduce allocations in receive hot path.
compressedBufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, defaultCompressedBufCap))
},
}
decompressedBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 0, defaultDecompressedBufCap)
return &b
},
}
writeRequestPool = sync.Pool{
New: func() interface{} {
return &prompb.WriteRequest{}
},
}
copyBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, copyBufSize)
return &b
},
}
)

type sizeLimiter interface {
AllowSizeBytes(string, int64) bool
}

// limitedBufferWriter writes to a buffer but aborts if the tenant exceeds the limiter.
// This protects the server when Content-Length is missing or incorrect.
type limitedBufferWriter struct {
b *bytes.Buffer
limiter sizeLimiter
tenant string
seen int64
}

func (w *limitedBufferWriter) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
w.seen += int64(len(p))
if !w.limiter.AllowSizeBytes(w.tenant, w.seen) {
return 0, errRequestTooLarge
}
return w.b.Write(p)
}

// zlabelsGet avoids ZLabels -> PromLabels conversion in hot paths.
func zlabelsGet(lbls []labelpb.ZLabel, name string) (string, bool) {
for _, l := range lbls {
Expand Down Expand Up @@ -187,6 +141,12 @@ type Options struct {
AsyncForwardWorkerCount uint
ReplicationProtocol ReplicationProtocol
TenantAttributor *TenantAttributor

// Pool configuration for receive-path buffer reuse.
// Note: If any of the capacity options are not set, we will use defaults.
PoolingDisabled bool
MaxPooledCompressedCap int
MaxPooledDecompressedCap int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -218,6 +178,9 @@ type Handler struct {
tenantAttributedTotal *prometheus.CounterVec

Limiter *Limiter

compressedBufPool *syncutil.Pool[*bytes.Buffer]
decompressedBufPool *syncutil.Pool[*[]byte]
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -236,6 +199,10 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
}
level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers)

// Default the options to the default values if they are not set.
o.MaxPooledCompressedCap = cmp.Or(o.MaxPooledCompressedCap, DefaultMaxPooledCompressedCap)
o.MaxPooledDecompressedCap = cmp.Or(o.MaxPooledDecompressedCap, DefaultMaxPooledDecompressedCap)

h := &Handler{
logger: logger,
writer: o.Writer,
Expand Down Expand Up @@ -346,6 +313,32 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The total number of time series attributed to each tenant by source.",
}, []string{"tenant", "source"},
),
compressedBufPool: syncutil.NewPool(func() *bytes.Buffer {
// Note: This 1KB initial capacity is a little bit arbitrary; we expect the buffer to
// grow as needed and be recycled internally such that at steady-state, there will be
// no more allocation overhead.
return bytes.NewBuffer(make([]byte, 0, 1024))
}).WithReset(func(b *bytes.Buffer) bool {
if b.Cap() <= o.MaxPooledCompressedCap {
b.Reset()
return true // return buffer to the pool.
}
return false // discard the buffer that is too large.
}).WithDisabled(o.PoolingDisabled).
Build(),
decompressedBufPool: syncutil.NewPool(func() *[]byte {
// We do not need to allocate capacity to this buffer here,
// as we will grow the buffer to the required size later.
// This is a requirement of the s2.Decode function.
b := make([]byte, 0)
return &b
}).WithReset(func(b *[]byte) bool {
if cap(*b) <= o.MaxPooledDecompressedCap {
*b = (*b)[:0]
return true // return buffer to the pool.
}
return false // discard the buffer that is too large.
}).WithDisabled(o.PoolingDisabled).Build(),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -724,79 +717,74 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// Check if the request is too large before we read the body.
requestLimiter := h.Limiter.RequestLimiter()
// io.ReadAll dynamically adjusts the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := compressedBufPool.Get().(*bytes.Buffer)
defer func() {
// Avoid pooling huge buffers forever.
if compressed.Cap() <= maxPooledCompressedCap {
compressed.Reset()
compressedBufPool.Put(compressed)
}
}()

if r.ContentLength >= 0 {
if !requestLimiter.AllowSizeBytes(tenantHTTP, r.ContentLength) {
h.writeRejectedTotal.WithLabelValues("request_size", tenantHTTP).Inc()
http.Error(w, errRequestTooLarge.Error(), http.StatusRequestEntityTooLarge)
return
}
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
}

// Enforce size limits even when Content-Length is missing or wrong.
lw := &limitedBufferWriter{
b: compressed,
limiter: requestLimiter,
tenant: tenantHTTP,
// get a buffer to temporarily store compressed contents.
compressed, ret1 := h.compressedBufPool.Get()
defer ret1(compressed)
if r.ContentLength > 0 {
compressed.Grow(int(r.ContentLength))
}
copyBuf := copyBufPool.Get().(*[]byte)
defer copyBufPool.Put(copyBuf)
_, err = io.CopyBuffer(lw, r.Body, *copyBuf)

_, err = compressed.ReadFrom(r.Body)
if err != nil {
if err == errRequestTooLarge {
h.writeRejectedTotal.WithLabelValues("request_size", tenantHTTP).Inc()
level.Error(tLogger).Log("msg", "error reading compressed request body", "err", err)
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

// enforce size limits if content length was unknown:
if r.ContentLength < 0 {
if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(compressed.Len())) {
http.Error(w, errRequestTooLarge.Error(), http.StatusRequestEntityTooLarge)
return
}
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

// Decode into a pooled buffer to avoid allocs. (cap-guarded on return)
reqBuf := decompressedBufPool.Get().(*[]byte)
defer func() {
if cap(*reqBuf) <= maxPooledDecompressedCap {
*reqBuf = (*reqBuf)[:0]
decompressedBufPool.Put(reqBuf)
}
}()
*reqBuf, err = s2.Decode((*reqBuf)[:0], compressed.Bytes())
reqBuf, ret2 := h.decompressedBufPool.Get()
defer ret2(reqBuf)

decodeLen, err := s2.DecodedLen(compressed.Bytes())
if err != nil {
level.Error(tLogger).Log("msg", "snappy decode length error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode length error").Error(), http.StatusBadRequest)
return
}

// ¡Important! If decode len is greater than the capacity of the buffer, we need to grow the buffer
// otherwise s2 will allocate a new slice for us, ignoring the provided buffer.
// Without this check, in the worst case we would be reserving large blocks of memory
// just to send them to GC for collection.
if cap(*reqBuf) < decodeLen {
*reqBuf = slices.Grow(*reqBuf, decodeLen)
}
*reqBuf, err = s2.Decode(*reqBuf, compressed.Bytes())
if err != nil {
level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

// Enforce size limits after decompression.
if !requestLimiter.AllowSizeBytes(tenantHTTP, int64(len(*reqBuf))) {
h.writeRejectedTotal.WithLabelValues("request_size", tenantHTTP).Inc()
http.Error(w, errRequestTooLarge.Error(), http.StatusRequestEntityTooLarge)
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
wreq := writeRequestPool.Get().(*prompb.WriteRequest)
wreq.Reset()
defer func() {
wreq.Reset()
writeRequestPool.Put(wreq)
}()
if err := proto.Unmarshal(*reqBuf, wreq); err != nil {
wreq := &prompb.WriteRequest{}
if err := wreq.Unmarshal(*reqBuf); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
Loading
Loading