From d097c812b9bac286a29ef3575261d502a15c6555 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> Date: Wed, 11 Mar 2026 17:25:22 -0700 Subject: [PATCH] Burst is per distributor. This tests proves it Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> --- .../ingestion_rate_strategy_test.go | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index fd8ea0d362e..786d8264c08 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -2,6 +2,7 @@ package distributor import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -121,6 +122,107 @@ func TestIngestionRateStrategy(t *testing.T) { } } +func TestGlobalIngestionRateStrategy_BurstBehavior(t *testing.T) { + t.Parallel() + + const ( + globalRate = 4000000.0 // 4M samples/sec global + burstSize = 800000 // 800K burst (not divided) + numDistributors = 10 + ) + + // Per-distributor rate should be globalRate / numDistributors = 400K/sec + expectedPerDistributorRate := globalRate / float64(numDistributors) + + limits := validation.Limits{ + IngestionRateStrategy: validation.GlobalIngestionRateStrategy, + IngestionRate: globalRate, + IngestionBurstSize: burstSize, + } + overrides := validation.NewOverrides(limits, nil) + + ring := newReadLifecyclerMock() + ring.On("HealthyInstancesCount").Return(numDistributors) + + strategy := newGlobalIngestionRateStrategy(overrides, ring) + + t.Run("rate is divided across distributors", func(t *testing.T) { + t.Parallel() + assert.Equal(t, expectedPerDistributorRate, strategy.Limit("test")) + }) + + t.Run("burst is not divided across distributors", func(t *testing.T) { + t.Parallel() + assert.Equal(t, burstSize, strategy.Burst("test")) + }) + + t.Run("token bucket allows burst then throttles sustained over-rate traffic", func(t *testing.T) { + t.Parallel() + rl := limiter.NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + // The bucket starts full at burstSize (800K tokens). + // Per-distributor rate refills at 400K tokens/sec. + // If we consume at 600K/sec (200K over rate), we drain the burst + // at 200K/sec, so burst lasts 800K / 200K = 4 seconds. + + batchSize := 60000 // 60K samples per batch + batchesPerSec := 10 // 10 batches/sec = 600K samples/sec + + batchInterval := time.Second / time.Duration(batchesPerSec) + totalAllowed := 0 + firstRejectedAt := time.Duration(0) + + // Simulate 10 seconds of traffic + for i := 0; i < batchesPerSec*10; i++ { + ts := now.Add(batchInterval * time.Duration(i)) + if rl.AllowN(ts, "test", batchSize) { + totalAllowed++ + } else if firstRejectedAt == 0 { + firstRejectedAt = batchInterval * time.Duration(i) + } + } + + // Should allow some batches initially (burst absorbs the overage) + assert.Greater(t, totalAllowed, 0, "some batches should be allowed initially") + + // Should eventually reject (burst exhausted) + assert.Greater(t, firstRejectedAt, time.Duration(0), "should eventually reject batches") + + // First rejection should happen around 4 seconds (800K burst / 200K overage per sec) + // Allow some tolerance due to discrete batch timing + assert.Greater(t, firstRejectedAt, 2*time.Second, "burst should sustain overage for more than 2s") + assert.Less(t, firstRejectedAt, 6*time.Second, "burst should be exhausted before 6s") + }) + + t.Run("sustained traffic at exactly per-distributor rate is not throttled", func(t *testing.T) { + t.Parallel() + rl := limiter.NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + // Send at exactly the per-distributor rate: 400K/sec in 40K batches, 10/sec + batchSize := 40000 + batchesPerSec := 10 + batchInterval := time.Second / time.Duration(batchesPerSec) + + // Simulate 30 seconds of traffic at the exact rate + for i := 0; i < batchesPerSec*30; i++ { + ts := now.Add(batchInterval * time.Duration(i)) + allowed := rl.AllowN(ts, "test", batchSize) + assert.True(t, allowed, "batch %d at %v should be allowed at sustained rate", i, batchInterval*time.Duration(i)) + } + }) + + t.Run("single request exceeding burst size is always rejected", func(t *testing.T) { + t.Parallel() + rl := limiter.NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + // A single request larger than burst size should always be rejected + assert.False(t, rl.AllowN(now, "test", burstSize+1)) + }) +} + type readLifecyclerMock struct { mock.Mock }