From 97b1508b88e86ce82aef616431bc87e5a78f592b Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:21:52 +0300 Subject: [PATCH 01/16] docs: add API rate limiting Phase 1 design spec Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../2026-06-30-api-rate-limiting-design.md | 217 ++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-30-api-rate-limiting-design.md diff --git a/docs/superpowers/specs/2026-06-30-api-rate-limiting-design.md b/docs/superpowers/specs/2026-06-30-api-rate-limiting-design.md new file mode 100644 index 00000000..bf2362e5 --- /dev/null +++ b/docs/superpowers/specs/2026-06-30-api-rate-limiting-design.md @@ -0,0 +1,217 @@ +# API Rate Limiting — Phase 1 (Tracking Only) + +**Date:** 2026-06-30 +**Status:** Draft +**Scope:** API backend (`api/`) + +## Problem + +The httpSMS API has no per-user rate limiting. Without it, a single user (free or paid) can +consume disproportionate resources. Before enforcing limits (returning 429), we need a +tracking-only phase to observe real usage patterns and validate thresholds. + +## Requirements + +1. **Per-user, plan-based daily limits** — each subscription tier gets a different + requests-per-day budget equal to `2 × SubscriptionName.Limit()`. +2. **Weighted counting** — GET list endpoints with a `limit` query param count that value + instead of 1 (e.g., `GET /v1/message-threads?limit=10` costs 10). +3. **Tracking only (Phase 1)** — the middleware never blocks requests. When a user exceeds + their daily budget, a CloudEvent is emitted once per window. +4. **Optional** — controlled by `RATE_LIMIT_ENABLED` env var (default `false`). Self-hosted + users are unaffected. +5. **Selective** — certain paths are excluded from rate limiting (e.g., `/v1/events` which is + called only by the system user). +6. **Cost-efficient** — the API receives ~20M requests/month and is hosted on Google Cloud Run + (instances restart every ~2 minutes). Per-request Redis calls are too expensive. Counters + live in memory and are batch-synced to Redis every 30 seconds. + +## Plan Tier Rate Limits + +| Plan | `Limit()` (messages) | `RateLimit()` (requests/day) | +|------|---------------------|------------------------------| +| Free | 200 | 400 | +| Pro | 5,000 | 10,000 | +| Ultra | 10,000 | 20,000 | +| 20K | 20,000 | 40,000 | +| 50K | 50,000 | 100,000 | +| 100K | 100,000 | 200,000 | +| 200K | 200,000 | 400,000 | + +## Architecture + +``` +Request → Auth Middleware → Rate Limit Middleware → Handler + │ + ┌───────┴────────┐ + │ In-Memory Map │ + │ (hot counter) │ + └───────┬────────┘ + │ every 30s + ┌───────┴────────┐ + │ Redis │ + │ (persistent) │ + └────────────────┘ +``` + +### Request Flow + +1. **Skip check**: if `RATE_LIMIT_ENABLED != "true"` or path is in the exclude list, call + `c.Next()` immediately. +2. **Extract identity**: read `AuthContext` from `c.Locals(ContextKeyAuthUserID)`. If not + authenticated (noop context), skip. +3. **Compute cost**: if the request is a GET with a `limit` query param, use + `min(parsedLimit, 200)` as the cost. Otherwise, cost = 1. +4. **Increment counter**: call `RateLimitService.Increment(ctx, userID, plan, cost)`. +5. **Check threshold**: if the returned count exceeds `plan.RateLimit()` and no event has + been emitted for this user in this window, emit a `rate.limit.exceeded` CloudEvent. + The "already notified" flag is stored in Redis (`rate_limit_notified:{user_id}` with 24h + TTL) so it survives instance restarts. +6. **Continue**: always call `c.Next()` — never block in Phase 1. + +### Hydration on Cold Start + +Cloud Run instances restart every ~2 minutes. When a user's first request hits a new instance: + +1. Read current count from Redis: `GET rate_limit:{user_id}` and `TTL rate_limit:{user_id}`. +2. Populate in-memory entry with the Redis count and remaining TTL as the window expiry. +3. Increment locally, mark delta as dirty. + +This means ~2 Redis reads per active user per instance startup. With batched writes every 30s, +total Redis operations are reduced from 20M/month to roughly 200K–400K/month. + +### Concurrent Instance Safety + +Multiple Cloud Run instances may run simultaneously. Safety is ensured by: + +- **Lazy load via `GET`**: each instance reads the latest Redis value on first access per user. +- **Atomic flush via `INCRBY`**: dirty deltas are flushed with `INCRBY`, not `SET`. This is + atomic and additive — concurrent flushes from different instances all contribute correctly. +- **TTL management**: the first instance to create a key sets a 24h TTL. Subsequent `INCRBY` + calls do not reset the TTL (Redis preserves TTL on INCRBY). + +## Components + +### New Files + +#### `pkg/services/rate_limit_service.go` + +Core rate limiting logic with in-memory counters and periodic Redis sync. + +```go +type RateLimitService struct { + tracer telemetry.Tracer + logger telemetry.Logger + client *redis.Client + dispatcher *EventDispatcher + + mu sync.Mutex + counters map[string]*userCounter + notified map[string]bool // in-memory cache; authoritative flag is in Redis + done chan struct{} +} + +type userCounter struct { + count int64 + windowExpiry time.Time + dirty int64 +} +``` + +**Methods:** + +- `NewRateLimitService(tracer, logger, client, dispatcher) *RateLimitService` — starts + background flush goroutine (every 30s). +- `Increment(ctx, userID string, plan SubscriptionName, cost int) (count int64, exceeded bool, err error)` — + increments counter, returns current count and whether the limit is exceeded. +- `Close()` — flushes remaining dirty counters and stops the background goroutine. Called on + graceful shutdown. +- `flush(ctx)` — iterates all dirty counters, calls `INCRBY` for each, resets dirty to 0. + Expired windows (past 24h) are cleaned up during flush. +- `hydrate(ctx, userID string) (*userCounter, error)` — reads `GET` + `TTL` from Redis, + returns populated counter. If key doesn't exist, returns a fresh counter with + `windowExpiry = now + 24h`. + +#### `pkg/middlewares/rate_limit_middleware.go` + +Fiber middleware that wraps `RateLimitService`. + +```go +func RateLimit( + tracer telemetry.Tracer, + service *services.RateLimitService, + excludePaths []string, +) fiber.Handler +``` + +- Reads `RATE_LIMIT_ENABLED` env var once at creation (not per-request). +- Uses prefix matching for exclude paths. +- Extracts cost from `limit` query param on GET requests, capped at 200. + +#### `pkg/events/rate_limit_exceeded_event.go` + +```go +const RateLimitExceeded = "rate.limit.exceeded" + +type RateLimitExceededPayload struct { + UserID string `json:"user_id"` + Count int64 `json:"count"` + Limit uint `json:"limit"` + Plan string `json:"plan"` +} +``` + +### Modified Files + +#### `pkg/entities/user.go` + +Add method: + +```go +func (subscription SubscriptionName) RateLimit() uint { + return subscription.Limit() * 2 +} +``` + +#### `pkg/di/container.go` + +- Add `RedisClient() *redis.Client` method — extracts Redis client creation from `Cache()` + so both `Cache` and `RateLimitService` share the same connection. +- Add `RateLimitService() *services.RateLimitService` method. +- Register the rate limit middleware after API key auth in `App()`: + ```go + app.Use(middlewares.RateLimit( + container.Tracer(), + container.RateLimitService(), + []string{"/v1/events"}, + )) + ``` + +#### `.env.docker` / `.env.example` + +Add: +``` +RATE_LIMIT_ENABLED=false +``` + +## Excluded Paths + +The following paths are excluded from rate limiting: + +- `/v1/events` — called only by the system user +- `/health` — registered before all middleware, naturally excluded + +## Testing Strategy + +- Unit tests for `RateLimitService`: increment, window expiry, hydration, flush, weighted + cost, concurrent access. +- Unit test for `SubscriptionName.RateLimit()`. +- Integration test for the middleware: verify it skips excluded paths, skips when disabled, + computes correct cost, and emits events on threshold breach. + +## Future Work (Phase 2) + +- Return `429 Too Many Requests` when the limit is exceeded. +- Add `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset` response headers. +- Dashboard/UI for users to see their current usage. +- Admin override to adjust individual user limits. From 672d3d9bff8861f570f58bb9c6fbe3ce71a7d7af Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:30:54 +0300 Subject: [PATCH 02/16] docs: add API rate limiting implementation plan Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../plans/2026-06-30-api-rate-limiting.md | 804 ++++++++++++++++++ 1 file changed, 804 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-30-api-rate-limiting.md diff --git a/docs/superpowers/plans/2026-06-30-api-rate-limiting.md b/docs/superpowers/plans/2026-06-30-api-rate-limiting.md new file mode 100644 index 00000000..01de56a4 --- /dev/null +++ b/docs/superpowers/plans/2026-06-30-api-rate-limiting.md @@ -0,0 +1,804 @@ +# API Rate Limiting (Phase 1 — Tracking Only) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add per-user, plan-based API rate limiting that tracks usage in-memory with periodic Redis sync, emitting a CloudEvent when limits are exceeded — without blocking any requests. + +**Architecture:** A custom Fiber middleware increments an in-memory counter per user per request (with weighted costs for list endpoints). A background goroutine flushes dirty counters to Redis every 30 seconds via atomic `INCRBY`. On cold start, counters are lazily hydrated from Redis. When a user exceeds their plan's daily limit, a `rate.limit.exceeded` CloudEvent is dispatched (deduplicated via Redis flag). + +**Tech Stack:** Go, Fiber v3, go-redis/v9, CloudEvents, OpenTelemetry tracing + +## Global Constraints + +- Go 1.21+ (module: `github.com/NdoleStudio/httpsms`) +- Error handling: always wrap with `github.com/palantir/stacktrace` +- Tracing: pass `telemetry.Tracer` and start spans in all public methods +- Redis key prefix: `rate_limit:` +- Rate limit window: 24 hours (sliding, based on Redis TTL) +- Rate limit budget: `SubscriptionName.Limit() * 2` requests per day +- Environment variable: `RATE_LIMIT_ENABLED` (default `"false"`) +- Excluded paths: `/v1/events` +- Phase 1: never return 429, always call `c.Next()` + +--- + +### Task 1: Add `RateLimit()` Method to `SubscriptionName` + +**Files:** +- Modify: `api/pkg/entities/user.go:38` (after `Limit()` method) +- Test: `api/pkg/entities/user_test.go` + +**Interfaces:** +- Consumes: existing `SubscriptionName.Limit() uint` +- Produces: `SubscriptionName.RateLimit() uint` — returns `Limit() * 2` + +- [ ] **Step 1: Write the failing test** + +Add to `api/pkg/entities/user_test.go`: + +```go +func TestSubscriptionName_RateLimit_Free(t *testing.T) { + assert.Equal(t, uint(400), SubscriptionNameFree.RateLimit()) +} + +func TestSubscriptionName_RateLimit_Pro(t *testing.T) { + assert.Equal(t, uint(10000), SubscriptionNameProMonthly.RateLimit()) +} + +func TestSubscriptionName_RateLimit_Ultra(t *testing.T) { + assert.Equal(t, uint(20000), SubscriptionNameUltraMonthly.RateLimit()) +} + +func TestSubscriptionName_RateLimit_200K(t *testing.T) { + assert.Equal(t, uint(400000), SubscriptionName200KMonthly.RateLimit()) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd api && go test ./pkg/entities/ -run TestSubscriptionName_RateLimit -v` +Expected: FAIL — `RateLimit` method not found + +- [ ] **Step 3: Write minimal implementation** + +Add to `api/pkg/entities/user.go` after the `Limit()` method (after line 38): + +```go +// RateLimit returns the daily API request rate limit for a subscription +func (subscription SubscriptionName) RateLimit() uint { + return subscription.Limit() * 2 +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd api && go test ./pkg/entities/ -run TestSubscriptionName_RateLimit -v` +Expected: PASS (all 4 tests) + +- [ ] **Step 5: Commit** + +```bash +cd api && git add pkg/entities/user.go pkg/entities/user_test.go +git commit -m "feat(entities): add RateLimit() method to SubscriptionName" +``` + +--- + +### Task 2: Add `rate.limit.exceeded` CloudEvent + +**Files:** +- Create: `api/pkg/events/rate_limit_exceeded_event.go` + +**Interfaces:** +- Consumes: `entities.UserID` +- Produces: `events.RateLimitExceeded` constant, `events.RateLimitExceededPayload` struct + +- [ ] **Step 1: Create the event file** + +Create `api/pkg/events/rate_limit_exceeded_event.go`: + +```go +package events + +import ( + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" +) + +// RateLimitExceeded is raised when a user exceeds their daily API rate limit. +const RateLimitExceeded = "rate.limit.exceeded" + +// RateLimitExceededPayload stores the data for the RateLimitExceeded event +type RateLimitExceededPayload struct { + UserID entities.UserID `json:"user_id"` + Count int64 `json:"count"` + Limit uint `json:"limit"` + Plan string `json:"plan"` + Timestamp time.Time `json:"timestamp"` +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cd api && go build ./pkg/events/` +Expected: Success (exit code 0) + +- [ ] **Step 3: Commit** + +```bash +cd api && git add pkg/events/rate_limit_exceeded_event.go +git commit -m "feat(events): add rate.limit.exceeded CloudEvent type" +``` + +--- + +### Task 3: Implement `RateLimitService` + +**Files:** +- Create: `api/pkg/services/rate_limit_service.go` +- Create: `api/pkg/services/rate_limit_service_test.go` + +**Interfaces:** +- Consumes: `telemetry.Tracer`, `telemetry.Logger`, `*redis.Client`, `*EventDispatcher`, `entities.SubscriptionName.RateLimit() uint`, `events.RateLimitExceeded`, `events.RateLimitExceededPayload` +- Produces: + - `NewRateLimitService(tracer telemetry.Tracer, logger telemetry.Logger, client *redis.Client, dispatcher *EventDispatcher) *RateLimitService` + - `(*RateLimitService).Increment(ctx context.Context, userID entities.UserID, plan entities.SubscriptionName, cost int) (count int64, exceeded bool, err error)` + - `(*RateLimitService).Close()` + +- [ ] **Step 1: Write the failing tests** + +Create `api/pkg/services/rate_limit_service_test.go`: + +```go +package services + +import ( + "context" + "testing" + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateLimitService_Increment_BasicCount(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(1), count) + assert.False(t, exceeded) +} + +func TestRateLimitService_Increment_WeightedCost(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + count, _, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 10) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(10), count) +} + +func TestRateLimitService_Increment_ExceedsLimit(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Free plan limit is 400. Exceed it. + for i := 0; i < 400; i++ { + _, _, _ = svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + } + + // Act — this pushes count to 401 + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(401), count) + assert.True(t, exceeded) +} + +func TestRateLimitService_Increment_MultipleUsers(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + _, _, _ = svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 5) + count, _, err := svc.Increment(ctx, "user-2", entities.SubscriptionNameProMonthly, 3) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(3), count) +} + +func TestRateLimitService_Increment_WindowExpiry(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Simulate an existing counter with an expired window + svc.mu.Lock() + svc.counters["user-1"] = &userCounter{ + count: 500, + windowExpiry: time.Now().Add(-1 * time.Hour), // expired + dirty: 0, + } + svc.mu.Unlock() + + // Act — should reset because the window expired + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(1), count) + assert.False(t, exceeded) +} + +// newTestRateLimitService creates a RateLimitService with nil redis client (no hydration) +// suitable for unit tests that only test in-memory logic. +func newTestRateLimitService(t *testing.T) *RateLimitService { + t.Helper() + return NewRateLimitService(nil, nil, nil, nil) +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd api && go test ./pkg/services/ -run TestRateLimitService -v` +Expected: FAIL — `RateLimitService` type not found + +- [ ] **Step 3: Write the implementation** + +Create `api/pkg/services/rate_limit_service.go`: + +```go +package services + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/events" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" + "github.com/redis/go-redis/v9" +) + +const ( + rateLimitKeyPrefix = "rate_limit:" + rateLimitNotifiedPrefix = "rate_limit_notified:" + rateLimitWindow = 24 * time.Hour + rateLimitFlushInterval = 30 * time.Second +) + +// RateLimitService tracks per-user API request counts with in-memory counters +// and periodic Redis sync. +type RateLimitService struct { + service + tracer telemetry.Tracer + logger telemetry.Logger + client *redis.Client + dispatcher *EventDispatcher + + mu sync.Mutex + counters map[string]*userCounter + notified map[string]bool + done chan struct{} +} + +type userCounter struct { + count int64 + windowExpiry time.Time + dirty int64 +} + +// NewRateLimitService creates a new RateLimitService and starts the background flush goroutine. +func NewRateLimitService( + tracer telemetry.Tracer, + logger telemetry.Logger, + client *redis.Client, + dispatcher *EventDispatcher, +) *RateLimitService { + svc := &RateLimitService{ + tracer: tracer, + logger: logger, + client: client, + dispatcher: dispatcher, + counters: make(map[string]*userCounter), + notified: make(map[string]bool), + done: make(chan struct{}), + } + + go svc.flushLoop() + return svc +} + +// Increment adds cost to the user's counter and returns the current count. +// If the count exceeds the plan's rate limit, exceeded is true. +func (svc *RateLimitService) Increment(ctx context.Context, userID entities.UserID, plan entities.SubscriptionName, cost int) (count int64, exceeded bool, err error) { + svc.mu.Lock() + defer svc.mu.Unlock() + + key := string(userID) + counter, exists := svc.counters[key] + + if !exists { + counter, err = svc.hydrate(ctx, key) + if err != nil { + counter = &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + } + } + svc.counters[key] = counter + } + + // Reset if window has expired + if time.Now().After(counter.windowExpiry) { + counter.count = 0 + counter.dirty = 0 + counter.windowExpiry = time.Now().Add(rateLimitWindow) + svc.notified[key] = false + } + + counter.count += int64(cost) + counter.dirty += int64(cost) + + limit := plan.RateLimit() + exceeded = counter.count > int64(limit) + + if exceeded && !svc.notified[key] { + svc.notified[key] = true + go svc.emitExceededEvent(ctx, userID, counter.count, limit, plan) + } + + return counter.count, exceeded, nil +} + +// Close flushes remaining dirty counters and stops the background goroutine. +func (svc *RateLimitService) Close() { + close(svc.done) + if svc.client != nil { + svc.flush(context.Background()) + } +} + +func (svc *RateLimitService) flushLoop() { + ticker := time.NewTicker(rateLimitFlushInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if svc.client != nil { + svc.flush(context.Background()) + } + case <-svc.done: + return + } + } +} + +func (svc *RateLimitService) flush(ctx context.Context) { + svc.mu.Lock() + // Collect dirty entries + type flushEntry struct { + key string + delta int64 + ttl time.Duration + } + var entries []flushEntry + now := time.Now() + + for key, counter := range svc.counters { + // Clean up expired windows + if now.After(counter.windowExpiry) { + delete(svc.counters, key) + delete(svc.notified, key) + continue + } + if counter.dirty > 0 { + entries = append(entries, flushEntry{ + key: rateLimitKeyPrefix + key, + delta: counter.dirty, + ttl: time.Until(counter.windowExpiry), + }) + counter.dirty = 0 + } + } + svc.mu.Unlock() + + // Flush to Redis outside the lock + for _, entry := range entries { + pipe := svc.client.Pipeline() + pipe.IncrBy(ctx, entry.key, entry.delta) + pipe.ExpireNX(ctx, entry.key, entry.ttl) + if _, err := pipe.Exec(ctx); err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) + } + } + } +} + +func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userCounter, error) { + if svc.client == nil { + return &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + }, nil + } + + key := rateLimitKeyPrefix + userID + countStr, err := svc.client.Get(ctx, key).Result() + if err == redis.Nil { + return &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + }, nil + } + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot hydrate rate limit for user [%s]", userID)) + } + + count, err := strconv.ParseInt(countStr, 10, 64) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID)) + } + + ttl, err := svc.client.TTL(ctx, key).Result() + if err != nil || ttl <= 0 { + ttl = rateLimitWindow + } + + // Check if already notified (in Redis) + notifiedKey := rateLimitNotifiedPrefix + userID + if exists, _ := svc.client.Exists(ctx, notifiedKey).Result(); exists > 0 { + svc.notified[userID] = true + } + + return &userCounter{ + count: count, + windowExpiry: time.Now().Add(ttl), + dirty: 0, + }, nil +} + +func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entities.UserID, count int64, limit uint, plan entities.SubscriptionName) { + if svc.dispatcher == nil { + return + } + + // Set notified flag in Redis (24h TTL) to survive restarts + if svc.client != nil { + notifiedKey := rateLimitNotifiedPrefix + string(userID) + svc.client.Set(ctx, notifiedKey, "1", rateLimitWindow) + } + + payload := events.RateLimitExceededPayload{ + UserID: userID, + Count: count, + Limit: limit, + Plan: string(plan), + Timestamp: time.Now().UTC(), + } + + event, err := svc.createEvent(events.RateLimitExceeded, string(userID), payload) + if err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID))) + } + return + } + + if err = svc.dispatcher.Dispatch(ctx, event); err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID))) + } + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd api && go test ./pkg/services/ -run TestRateLimitService -v` +Expected: PASS (all 5 tests) + +- [ ] **Step 5: Commit** + +```bash +cd api && git add pkg/services/rate_limit_service.go pkg/services/rate_limit_service_test.go +git commit -m "feat(services): implement RateLimitService with in-memory counters and Redis sync" +``` + +--- + +### Task 4: Implement Rate Limit Middleware + +**Files:** +- Create: `api/pkg/middlewares/rate_limit_middleware.go` + +**Interfaces:** +- Consumes: `telemetry.Tracer`, `*services.RateLimitService`, `entities.AuthContext` (from `c.Locals`), `entities.SubscriptionName` +- Produces: `middlewares.RateLimit(tracer telemetry.Tracer, logger telemetry.Logger, service *services.RateLimitService, userRepository repositories.UserRepository, excludePaths []string) fiber.Handler` + +- [ ] **Step 1: Create the middleware** + +Create `api/pkg/middlewares/rate_limit_middleware.go`: + +```go +package middlewares + +import ( + "os" + "strconv" + "strings" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/repositories" + "github.com/NdoleStudio/httpsms/pkg/services" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/gofiber/fiber/v3" +) + +const rateLimitCostCap = 200 + +// RateLimit tracks per-user API request counts without blocking requests. +func RateLimit( + tracer telemetry.Tracer, + logger telemetry.Logger, + service *services.RateLimitService, + userRepository repositories.UserRepository, + excludePaths []string, +) fiber.Handler { + enabled := os.Getenv("RATE_LIMIT_ENABLED") == "true" + logger = logger.WithService("middlewares.RateLimit") + + return func(c fiber.Ctx) error { + if !enabled { + return c.Next() + } + + // Check excluded paths + path := c.Path() + for _, excluded := range excludePaths { + if strings.HasPrefix(path, excluded) { + return c.Next() + } + } + + ctx, span := tracer.StartFromFiberCtx(c, "middlewares.RateLimit") + defer span.End() + + // Extract authenticated user + authUser, ok := c.Locals(ContextKeyAuthUserID).(entities.AuthContext) + if !ok || authUser.IsNoop() { + return c.Next() + } + + // Compute cost + cost := 1 + if c.Method() == fiber.MethodGet { + if limitParam := c.Query("limit"); limitParam != "" { + if parsed, err := strconv.Atoi(limitParam); err == nil && parsed > 0 { + cost = parsed + if cost > rateLimitCostCap { + cost = rateLimitCostCap + } + } + } + } + + // Load user's subscription plan + user, err := userRepository.Load(ctx, authUser.ID) + if err != nil { + ctxLogger := tracer.CtxLogger(logger, span) + ctxLogger.Error(err) + return c.Next() + } + + // Increment rate limit counter + _, _, _ = service.Increment(ctx, authUser.ID, user.SubscriptionName, cost) + + return c.Next() + } +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `cd api && go build ./pkg/middlewares/` +Expected: Success (exit code 0) + +- [ ] **Step 3: Commit** + +```bash +cd api && git add pkg/middlewares/rate_limit_middleware.go +git commit -m "feat(middlewares): add rate limit middleware for tracking API usage" +``` + +--- + +### Task 5: Wire Everything in the DI Container and Add Env Config + +**Files:** +- Modify: `api/pkg/di/container.go:85-99` (add `rateLimitService` and `redisClient` fields to Container struct) +- Modify: `api/pkg/di/container.go:443-469` (extract Redis client creation) +- Modify: `api/pkg/di/container.go:203-206` (add rate limit middleware to the chain) +- Modify: `api/.env.docker` + +**Interfaces:** +- Consumes: `services.NewRateLimitService(...)`, `middlewares.RateLimit(...)`, all existing container methods +- Produces: `Container.RedisClient() *redis.Client`, `Container.RateLimitService() *services.RateLimitService` + +- [ ] **Step 1: Add fields to the Container struct** + +In `api/pkg/di/container.go`, add to the `Container` struct (after line 98, `inMemoryCache cache.Cache`): + +```go + rateLimitService *services.RateLimitService + redisClient *redis.Client +``` + +- [ ] **Step 2: Add `RedisClient()` method** + +Add after the existing `Cache()` method (after line 469): + +```go +// RedisClient creates or returns the shared *redis.Client +func (container *Container) RedisClient() *redis.Client { + if container.redisClient != nil { + return container.redisClient + } + + container.logger.Debug("creating *redis.Client") + opt, err := redis.ParseURL(os.Getenv("REDIS_URL")) + if err != nil { + container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot parse redis url [%s]", os.Getenv("REDIS_URL")))) + } + if strings.HasPrefix(os.Getenv("REDIS_URL"), "rediss://") { + opt.TLSConfig = &tls.Config{ + MinVersion: tls.VersionTLS12, + } + } + + container.redisClient = redis.NewClient(opt) + + if err = redisotel.InstrumentTracing(container.redisClient); err != nil { + container.logger.Error(stacktrace.Propagate(err, "cannot instrument redis tracing")) + } + if err = redisotel.InstrumentMetrics(container.redisClient); err != nil { + container.logger.Fatal(stacktrace.Propagate(err, "cannot instrument redis metrics")) + } + + return container.redisClient +} +``` + +- [ ] **Step 3: Refactor `Cache()` to use `RedisClient()`** + +Replace the body of `Cache()` to reuse the shared client: + +```go +// Cache creates a new instance of cache.Cache +func (container *Container) Cache() cache.Cache { + container.logger.Debug("creating cache.Cache") + return cache.NewRedisCache(container.Tracer(), container.RedisClient()) +} +``` + +- [ ] **Step 4: Add `RateLimitService()` method** + +Add after `RedisClient()`: + +```go +// RateLimitService creates or returns the shared *services.RateLimitService +func (container *Container) RateLimitService() *services.RateLimitService { + if container.rateLimitService != nil { + return container.rateLimitService + } + + container.logger.Debug("creating services.RateLimitService") + container.rateLimitService = services.NewRateLimitService( + container.Tracer(), + container.Logger(), + container.RedisClient(), + container.EventDispatcher(), + ) + return container.rateLimitService +} +``` + +- [ ] **Step 5: Register middleware in `App()`** + +In the `App()` method, add the rate limit middleware after the API key auth line (`app.Use(middlewares.APIKeyAuth(...))`). Insert after line 205: + +```go + app.Use(middlewares.RateLimit( + container.Tracer(), + container.Logger(), + container.RateLimitService(), + container.UserRepository(), + []string{"/v1/events"}, + )) +``` + +- [ ] **Step 6: Add `RATE_LIMIT_ENABLED` to `.env.docker`** + +Add after the `REDIS_URL` line in `api/.env.docker`: + +``` +# Rate limiting (set to "true" to enable per-user API rate tracking) +RATE_LIMIT_ENABLED=false +``` + +- [ ] **Step 7: Verify it compiles and tests pass** + +Run: `cd api && go build . && go test ./...` +Expected: Build succeeds, all tests pass + +- [ ] **Step 8: Commit** + +```bash +cd api && git add pkg/di/container.go .env.docker +git commit -m "feat(di): wire rate limit service and middleware into DI container" +``` + +--- + +### Task 6: End-to-End Verification + +**Files:** +- No new files — verification only + +**Interfaces:** +- Consumes: all previous tasks + +- [ ] **Step 1: Run the full test suite** + +Run: `cd api && go test ./... -count=1` +Expected: All tests pass + +- [ ] **Step 2: Build the binary** + +Run: `cd api && go build -o ./tmp/main.exe .` +Expected: Build succeeds + +- [ ] **Step 3: Verify Docker Compose build (optional, if Docker available)** + +Run: `docker compose build api` +Expected: Build succeeds + +- [ ] **Step 4: Final commit (if any formatting fixes needed)** + +Run: `cd api && go fmt ./... && go vet ./...` +If changes: commit with `style: format rate limiting code` From d28244015a149f34beeb8160075f8762dee67623 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:33:41 +0300 Subject: [PATCH 03/16] feat(entities): add RateLimit() method to SubscriptionName - Returns subscription.Limit() * 2 for daily API request budgets - Add 4 test cases covering Free, Pro, Ultra, and 200K subscriptions - RateLimit(): Free=400, Pro=10000, Ultra=20000, 200K=400000 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/entities/user.go | 5 +++++ api/pkg/entities/user_test.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/api/pkg/entities/user.go b/api/pkg/entities/user.go index 6cf2eba9..94fda428 100644 --- a/api/pkg/entities/user.go +++ b/api/pkg/entities/user.go @@ -37,6 +37,11 @@ func (subscription SubscriptionName) Limit() uint { } } +// RateLimit returns the daily API request rate limit for a subscription +func (subscription SubscriptionName) RateLimit() uint { + return subscription.Limit() * 2 +} + // SubscriptionNameFree represents a free subscription const SubscriptionNameFree = SubscriptionName("free") diff --git a/api/pkg/entities/user_test.go b/api/pkg/entities/user_test.go index 0417e63f..63dcf3a6 100644 --- a/api/pkg/entities/user_test.go +++ b/api/pkg/entities/user_test.go @@ -51,3 +51,19 @@ func TestUser_GetBillingAnchorDay_PaidUserDay31(t *testing.T) { } assert.Equal(t, 31, user.GetBillingAnchorDay()) } + +func TestSubscriptionName_RateLimit_Free(t *testing.T) { + assert.Equal(t, uint(400), SubscriptionNameFree.RateLimit()) +} + +func TestSubscriptionName_RateLimit_Pro(t *testing.T) { + assert.Equal(t, uint(10000), SubscriptionNameProMonthly.RateLimit()) +} + +func TestSubscriptionName_RateLimit_Ultra(t *testing.T) { + assert.Equal(t, uint(20000), SubscriptionNameUltraMonthly.RateLimit()) +} + +func TestSubscriptionName_RateLimit_200K(t *testing.T) { + assert.Equal(t, uint(400000), SubscriptionName200KMonthly.RateLimit()) +} From 0d671c2e5de32555c2c5b52702e2e105152af697 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:35:25 +0300 Subject: [PATCH 04/16] feat(events): add rate.limit.exceeded CloudEvent type Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/events/rate_limit_exceeded_event.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 api/pkg/events/rate_limit_exceeded_event.go diff --git a/api/pkg/events/rate_limit_exceeded_event.go b/api/pkg/events/rate_limit_exceeded_event.go new file mode 100644 index 00000000..d27f1731 --- /dev/null +++ b/api/pkg/events/rate_limit_exceeded_event.go @@ -0,0 +1,19 @@ +package events + +import ( + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" +) + +// RateLimitExceeded is raised when a user exceeds their daily API rate limit. +const RateLimitExceeded = "rate.limit.exceeded" + +// RateLimitExceededPayload stores the data for the RateLimitExceeded event +type RateLimitExceededPayload struct { + UserID entities.UserID `json:"user_id"` + Count int64 `json:"count"` + Limit uint `json:"limit"` + Plan string `json:"plan"` + Timestamp time.Time `json:"timestamp"` +} From 65ca2ddaed32871b5af90b3cdb56af2e8679e17e Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:40:44 +0300 Subject: [PATCH 05/16] feat(services): implement RateLimitService with in-memory counters and Redis sync - Add RateLimitService with per-user API request tracking - Implement in-memory counters with 24h sliding windows - Add background flush to Redis every 30s with pipelining - Emit RateLimitExceeded events when limits are exceeded - Support graceful nil handling for tracer, logger, client, dispatcher - Add comprehensive tests for basic counting, weighted costs, limit enforcement, window expiry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/services/rate_limit_service.go | 252 ++++++++++++++++++++ api/pkg/services/rate_limit_service_test.go | 111 +++++++++ 2 files changed, 363 insertions(+) create mode 100644 api/pkg/services/rate_limit_service.go create mode 100644 api/pkg/services/rate_limit_service_test.go diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go new file mode 100644 index 00000000..3498c3cb --- /dev/null +++ b/api/pkg/services/rate_limit_service.go @@ -0,0 +1,252 @@ +package services + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/events" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" + "github.com/redis/go-redis/v9" +) + +const ( + rateLimitKeyPrefix = "rate_limit:" + rateLimitNotifiedPrefix = "rate_limit_notified:" + rateLimitWindow = 24 * time.Hour + rateLimitFlushInterval = 30 * time.Second +) + +// RateLimitService tracks per-user API request counts with in-memory counters +// and periodic Redis sync. +type RateLimitService struct { + service + tracer telemetry.Tracer + logger telemetry.Logger + client *redis.Client + dispatcher *EventDispatcher + + mu sync.Mutex + counters map[string]*userCounter + notified map[string]bool + done chan struct{} +} + +type userCounter struct { + count int64 + windowExpiry time.Time + dirty int64 +} + +// NewRateLimitService creates a new RateLimitService and starts the background flush goroutine. +func NewRateLimitService( + tracer telemetry.Tracer, + logger telemetry.Logger, + client *redis.Client, + dispatcher *EventDispatcher, +) *RateLimitService { + svc := &RateLimitService{ + tracer: tracer, + logger: logger, + client: client, + dispatcher: dispatcher, + counters: make(map[string]*userCounter), + notified: make(map[string]bool), + done: make(chan struct{}), + } + + go svc.flushLoop() + return svc +} + +// Increment adds cost to the user's counter and returns the current count. +// If the count exceeds the plan's rate limit, exceeded is true. +func (svc *RateLimitService) Increment(ctx context.Context, userID entities.UserID, plan entities.SubscriptionName, cost int) (count int64, exceeded bool, err error) { + svc.mu.Lock() + defer svc.mu.Unlock() + + key := string(userID) + counter, exists := svc.counters[key] + + if !exists { + counter, err = svc.hydrate(ctx, key) + if err != nil { + counter = &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + } + } + svc.counters[key] = counter + } + + // Reset if window has expired + if time.Now().After(counter.windowExpiry) { + counter.count = 0 + counter.dirty = 0 + counter.windowExpiry = time.Now().Add(rateLimitWindow) + svc.notified[key] = false + } + + counter.count += int64(cost) + counter.dirty += int64(cost) + + limit := plan.RateLimit() + exceeded = counter.count > int64(limit) + + if exceeded && !svc.notified[key] { + svc.notified[key] = true + go svc.emitExceededEvent(ctx, userID, counter.count, limit, plan) + } + + return counter.count, exceeded, nil +} + +// Close flushes remaining dirty counters and stops the background goroutine. +func (svc *RateLimitService) Close() { + close(svc.done) + if svc.client != nil { + svc.flush(context.Background()) + } +} + +func (svc *RateLimitService) flushLoop() { + ticker := time.NewTicker(rateLimitFlushInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if svc.client != nil { + svc.flush(context.Background()) + } + case <-svc.done: + return + } + } +} + +func (svc *RateLimitService) flush(ctx context.Context) { + svc.mu.Lock() + // Collect dirty entries + type flushEntry struct { + key string + delta int64 + ttl time.Duration + } + var entries []flushEntry + now := time.Now() + + for key, counter := range svc.counters { + // Clean up expired windows + if now.After(counter.windowExpiry) { + delete(svc.counters, key) + delete(svc.notified, key) + continue + } + if counter.dirty > 0 { + entries = append(entries, flushEntry{ + key: rateLimitKeyPrefix + key, + delta: counter.dirty, + ttl: time.Until(counter.windowExpiry), + }) + counter.dirty = 0 + } + } + svc.mu.Unlock() + + // Flush to Redis outside the lock + for _, entry := range entries { + pipe := svc.client.Pipeline() + pipe.IncrBy(ctx, entry.key, entry.delta) + pipe.ExpireNX(ctx, entry.key, entry.ttl) + if _, err := pipe.Exec(ctx); err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) + } + } + } +} + +func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userCounter, error) { + if svc.client == nil { + return &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + }, nil + } + + key := rateLimitKeyPrefix + userID + countStr, err := svc.client.Get(ctx, key).Result() + if err == redis.Nil { + return &userCounter{ + count: 0, + windowExpiry: time.Now().Add(rateLimitWindow), + dirty: 0, + }, nil + } + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot hydrate rate limit for user [%s]", userID)) + } + + count, err := strconv.ParseInt(countStr, 10, 64) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID)) + } + + ttl, err := svc.client.TTL(ctx, key).Result() + if err != nil || ttl <= 0 { + ttl = rateLimitWindow + } + + // Check if already notified (in Redis) + notifiedKey := rateLimitNotifiedPrefix + userID + if exists, _ := svc.client.Exists(ctx, notifiedKey).Result(); exists > 0 { + svc.notified[userID] = true + } + + return &userCounter{ + count: count, + windowExpiry: time.Now().Add(ttl), + dirty: 0, + }, nil +} + +func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entities.UserID, count int64, limit uint, plan entities.SubscriptionName) { + if svc.dispatcher == nil { + return + } + + // Set notified flag in Redis (24h TTL) to survive restarts + if svc.client != nil { + notifiedKey := rateLimitNotifiedPrefix + string(userID) + svc.client.Set(ctx, notifiedKey, "1", rateLimitWindow) + } + + payload := events.RateLimitExceededPayload{ + UserID: userID, + Count: count, + Limit: limit, + Plan: string(plan), + Timestamp: time.Now().UTC(), + } + + event, err := svc.createEvent(events.RateLimitExceeded, string(userID), payload) + if err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID))) + } + return + } + + if err = svc.dispatcher.Dispatch(ctx, event); err != nil { + if svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID))) + } + } +} diff --git a/api/pkg/services/rate_limit_service_test.go b/api/pkg/services/rate_limit_service_test.go new file mode 100644 index 00000000..96030a0d --- /dev/null +++ b/api/pkg/services/rate_limit_service_test.go @@ -0,0 +1,111 @@ +package services + +import ( + "context" + "testing" + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateLimitService_Increment_BasicCount(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(1), count) + assert.False(t, exceeded) +} + +func TestRateLimitService_Increment_WeightedCost(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + count, _, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 10) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(10), count) +} + +func TestRateLimitService_Increment_ExceedsLimit(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Free plan limit is 400. Exceed it. + for i := 0; i < 400; i++ { + _, _, _ = svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + } + + // Act — this pushes count to 401 + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(401), count) + assert.True(t, exceeded) +} + +func TestRateLimitService_Increment_MultipleUsers(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Act + _, _, _ = svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 5) + count, _, err := svc.Increment(ctx, "user-2", entities.SubscriptionNameProMonthly, 3) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(3), count) +} + +func TestRateLimitService_Increment_WindowExpiry(t *testing.T) { + // Arrange + svc := newTestRateLimitService(t) + defer svc.Close() + + ctx := context.Background() + + // Simulate an existing counter with an expired window + svc.mu.Lock() + svc.counters["user-1"] = &userCounter{ + count: 500, + windowExpiry: time.Now().Add(-1 * time.Hour), // expired + dirty: 0, + } + svc.mu.Unlock() + + // Act — should reset because the window expired + count, exceeded, err := svc.Increment(ctx, "user-1", entities.SubscriptionNameFree, 1) + + // Assert + require.NoError(t, err) + assert.Equal(t, int64(1), count) + assert.False(t, exceeded) +} + +// newTestRateLimitService creates a RateLimitService with nil redis client (no hydration) +// suitable for unit tests that only test in-memory logic. +func newTestRateLimitService(t *testing.T) *RateLimitService { + t.Helper() + return NewRateLimitService(nil, nil, nil, nil) +} From 5cc7db3f47aa676c4da0b53b3d68f037404048f7 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:44:15 +0300 Subject: [PATCH 06/16] feat(middlewares): add rate limit middleware for tracking API usage Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/middlewares/rate_limit_middleware.go | 71 ++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 api/pkg/middlewares/rate_limit_middleware.go diff --git a/api/pkg/middlewares/rate_limit_middleware.go b/api/pkg/middlewares/rate_limit_middleware.go new file mode 100644 index 00000000..4c58dbb5 --- /dev/null +++ b/api/pkg/middlewares/rate_limit_middleware.go @@ -0,0 +1,71 @@ +package middlewares + +import ( + "os" + "strconv" + "strings" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/repositories" + "github.com/NdoleStudio/httpsms/pkg/services" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/gofiber/fiber/v3" +) + +const rateLimitCostCap = 200 + +// RateLimit tracks per-user API request counts without blocking requests. +func RateLimit( + tracer telemetry.Tracer, + logger telemetry.Logger, + service *services.RateLimitService, + userRepository repositories.UserRepository, + excludePaths []string, +) fiber.Handler { + enabled := os.Getenv("RATE_LIMIT_ENABLED") == "true" + logger = logger.WithService("middlewares.RateLimit") + + return func(c fiber.Ctx) error { + if !enabled { + return c.Next() + } + + path := c.Path() + for _, excluded := range excludePaths { + if strings.HasPrefix(path, excluded) { + return c.Next() + } + } + + ctx, span := tracer.StartFromFiberCtx(c, "middlewares.RateLimit") + defer span.End() + + authUser, ok := c.Locals(ContextKeyAuthUserID).(entities.AuthContext) + if !ok || authUser.IsNoop() { + return c.Next() + } + + cost := 1 + if c.Method() == fiber.MethodGet { + if limitParam := c.Query("limit"); limitParam != "" { + if parsed, err := strconv.Atoi(limitParam); err == nil && parsed > 0 { + cost = parsed + if cost > rateLimitCostCap { + cost = rateLimitCostCap + } + } + } + } + + user, err := userRepository.Load(ctx, authUser.ID) + if err != nil { + ctxLogger := tracer.CtxLogger(logger, span) + ctxLogger.Error(err) + return c.Next() + } + + _, _, _ = service.Increment(ctx, authUser.ID, user.SubscriptionName, cost) + + return c.Next() + } +} From e2a75e67d543d81512a64288c0884d77d044ac6d Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:46:24 +0300 Subject: [PATCH 07/16] feat(di): wire rate limit service and middleware into DI container - Add rateLimitService and redisClient fields to Container struct - Extract RedisClient() method for shared Redis client creation - Refactor Cache() to reuse RedisClient() singleton - Add RateLimitService() lazy singleton initialization - Register RateLimit middleware in App() after API key auth - Add RATE_LIMIT_ENABLED=false to .env.docker Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/.env.docker | 3 +++ api/pkg/di/container.go | 46 ++++++++++++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/api/.env.docker b/api/.env.docker index 235972f4..d7820d82 100644 --- a/api/.env.docker +++ b/api/.env.docker @@ -52,6 +52,9 @@ DATABASE_URL_DEDICATED=postgresql://dbusername:dbpassword@postgres:5432/httpsms # Redis connection string REDIS_URL=redis://@redis:6379 +# Rate limiting (set to "true" to enable per-user API rate tracking) +RATE_LIMIT_ENABLED=false + # Google Cloud Storage bucket for MMS attachments. Leave empty to use in-memory storage. GCS_BUCKET_NAME= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index f84f5b30..079e1761 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -96,6 +96,8 @@ type Container struct { userRistrettoCache *ristretto.Cache[string, entities.AuthContext] phoneRistrettoCache *ristretto.Cache[string, *entities.Phone] inMemoryCache cache.Cache + rateLimitService *services.RateLimitService + redisClient *redis.Client } // NewLiteContainer creates a Container without any routes or listeners @@ -203,6 +205,13 @@ func (container *Container) App() (app *fiber.App) { app.Use(middlewares.HTTPRequestLogger(container.Tracer(), container.Logger())) app.Use(middlewares.BearerAuth(container.Logger(), container.Tracer(), container.FirebaseAuthClient())) app.Use(middlewares.APIKeyAuth(container.Logger(), container.Tracer(), container.UserRepository())) + app.Use(middlewares.RateLimit( + container.Tracer(), + container.Logger(), + container.RateLimitService(), + container.UserRepository(), + []string{"/v1/events"}, + )) container.app = app return app @@ -443,6 +452,16 @@ func (container *Container) InMemoryCache() cache.Cache { // Cache creates a new instance of cache.Cache func (container *Container) Cache() cache.Cache { container.logger.Debug("creating cache.Cache") + return cache.NewRedisCache(container.Tracer(), container.RedisClient()) +} + +// RedisClient creates or returns the shared *redis.Client +func (container *Container) RedisClient() *redis.Client { + if container.redisClient != nil { + return container.redisClient + } + + container.logger.Debug("creating *redis.Client") opt, err := redis.ParseURL(os.Getenv("REDIS_URL")) if err != nil { container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot parse redis url [%s]", os.Getenv("REDIS_URL")))) @@ -453,19 +472,32 @@ func (container *Container) Cache() cache.Cache { } } - redisClient := redis.NewClient(opt) + container.redisClient = redis.NewClient(opt) - // Enable tracing instrumentation. - if err = redisotel.InstrumentTracing(redisClient); err != nil { + if err = redisotel.InstrumentTracing(container.redisClient); err != nil { container.logger.Error(stacktrace.Propagate(err, "cannot instrument redis tracing")) } - - // Enable metrics instrumentation. - if err = redisotel.InstrumentMetrics(redisClient); err != nil { + if err = redisotel.InstrumentMetrics(container.redisClient); err != nil { container.logger.Fatal(stacktrace.Propagate(err, "cannot instrument redis metrics")) } - return cache.NewRedisCache(container.Tracer(), redisClient) + return container.redisClient +} + +// RateLimitService creates or returns the shared *services.RateLimitService +func (container *Container) RateLimitService() *services.RateLimitService { + if container.rateLimitService != nil { + return container.rateLimitService + } + + container.logger.Debug("creating services.RateLimitService") + container.rateLimitService = services.NewRateLimitService( + container.Tracer(), + container.Logger(), + container.RedisClient(), + container.EventDispatcher(), + ) + return container.rateLimitService } // FirebaseAuthClient creates a new instance of auth.Client From 04ef44fdc58b29642f3ff3835cb8f03a8811b466 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 11:56:01 +0300 Subject: [PATCH 08/16] fix: add graceful shutdown for RateLimitService and handle Redis notification errors - Add Container.Close() to flush rate limit counters on shutdown - Call defer container.Close() in main.go - Check and log errors from Redis notification flag persistence Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/main.go | 1 + api/pkg/di/container.go | 7 +++++++ api/pkg/services/rate_limit_service.go | 4 +++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/api/main.go b/api/main.go index 5b7539c9..9b2886fe 100644 --- a/api/main.go +++ b/api/main.go @@ -42,5 +42,6 @@ func main() { } container := di.NewContainer(os.Getenv("GCP_PROJECT_ID"), Version) + defer container.Close() container.Logger().Info(container.App().Listen(fmt.Sprintf("%s:%s", os.Getenv("APP_HOST"), os.Getenv("APP_PORT"))).Error()) } diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 079e1761..14ea8e3c 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -107,6 +107,13 @@ func NewLiteContainer() (container *Container) { } } +// Close gracefully shuts down container resources +func (container *Container) Close() { + if container.rateLimitService != nil { + container.rateLimitService.Close() + } +} + // NewContainer creates a new dependency injection container func NewContainer(projectID string, version string) (container *Container) { container = &Container{ diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index 3498c3cb..0d6b9b93 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -225,7 +225,9 @@ func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entit // Set notified flag in Redis (24h TTL) to survive restarts if svc.client != nil { notifiedKey := rateLimitNotifiedPrefix + string(userID) - svc.client.Set(ctx, notifiedKey, "1", rateLimitWindow) + if err := svc.client.Set(ctx, notifiedKey, "1", rateLimitWindow).Err(); err != nil && svc.logger != nil { + svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID))) + } } payload := events.RateLimitExceededPayload{ From d2ece4fb2e38761f790e88f3f43a84b6200c58a0 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:03:29 +0300 Subject: [PATCH 09/16] feat(services): add 1s Redis timeout, fail-open behavior, and shutdown logging - All Redis calls in RateLimitService use 1s context timeout - If Redis is slow/down, rate limiting continues in-memory (fail open) - Added logging to flush loop start/stop and graceful shutdown Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/services/rate_limit_service.go | 38 ++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index 0d6b9b93..b9d12838 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -19,6 +19,7 @@ const ( rateLimitNotifiedPrefix = "rate_limit_notified:" rateLimitWindow = 24 * time.Hour rateLimitFlushInterval = 30 * time.Second + rateLimitRedisTimeout = 1 * time.Second ) // RateLimitService tracks per-user API request counts with in-memory counters @@ -108,16 +109,26 @@ func (svc *RateLimitService) Increment(ctx context.Context, userID entities.User // Close flushes remaining dirty counters and stops the background goroutine. func (svc *RateLimitService) Close() { + if svc.logger != nil { + svc.logger.Info("RateLimitService shutting down, flushing counters") + } close(svc.done) if svc.client != nil { svc.flush(context.Background()) } + if svc.logger != nil { + svc.logger.Info("RateLimitService shutdown complete") + } } func (svc *RateLimitService) flushLoop() { ticker := time.NewTicker(rateLimitFlushInterval) defer ticker.Stop() + if svc.logger != nil { + svc.logger.Info(fmt.Sprintf("RateLimitService flush loop started (interval: %s)", rateLimitFlushInterval)) + } + for { select { case <-ticker.C: @@ -125,6 +136,9 @@ func (svc *RateLimitService) flushLoop() { svc.flush(context.Background()) } case <-svc.done: + if svc.logger != nil { + svc.logger.Info("RateLimitService flush loop stopped") + } return } } @@ -159,12 +173,15 @@ func (svc *RateLimitService) flush(ctx context.Context) { } svc.mu.Unlock() - // Flush to Redis outside the lock + // Flush to Redis outside the lock with timeout + redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) + defer cancel() + for _, entry := range entries { pipe := svc.client.Pipeline() - pipe.IncrBy(ctx, entry.key, entry.delta) - pipe.ExpireNX(ctx, entry.key, entry.ttl) - if _, err := pipe.Exec(ctx); err != nil { + pipe.IncrBy(redisCtx, entry.key, entry.delta) + pipe.ExpireNX(redisCtx, entry.key, entry.ttl) + if _, err := pipe.Exec(redisCtx); err != nil { if svc.logger != nil { svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) } @@ -181,8 +198,11 @@ func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userC }, nil } + redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) + defer cancel() + key := rateLimitKeyPrefix + userID - countStr, err := svc.client.Get(ctx, key).Result() + countStr, err := svc.client.Get(redisCtx, key).Result() if err == redis.Nil { return &userCounter{ count: 0, @@ -199,14 +219,14 @@ func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userC return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID)) } - ttl, err := svc.client.TTL(ctx, key).Result() + ttl, err := svc.client.TTL(redisCtx, key).Result() if err != nil || ttl <= 0 { ttl = rateLimitWindow } // Check if already notified (in Redis) notifiedKey := rateLimitNotifiedPrefix + userID - if exists, _ := svc.client.Exists(ctx, notifiedKey).Result(); exists > 0 { + if exists, _ := svc.client.Exists(redisCtx, notifiedKey).Result(); exists > 0 { svc.notified[userID] = true } @@ -224,8 +244,10 @@ func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entit // Set notified flag in Redis (24h TTL) to survive restarts if svc.client != nil { + redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) + defer cancel() notifiedKey := rateLimitNotifiedPrefix + string(userID) - if err := svc.client.Set(ctx, notifiedKey, "1", rateLimitWindow).Err(); err != nil && svc.logger != nil { + if err := svc.client.Set(redisCtx, notifiedKey, "1", rateLimitWindow).Err(); err != nil && svc.logger != nil { svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID))) } } From df3368fa402b205eb3dfc8633a7343c3c746d23d Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:10:41 +0300 Subject: [PATCH 10/16] refactor: use Fiber OnPreShutdown hook for RateLimitService cleanup Replace defer container.Close() in main.go with app.Hooks().OnPreShutdown() which is the idiomatic Fiber v3 approach for graceful shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/main.go | 1 - api/pkg/di/container.go | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/api/main.go b/api/main.go index 9b2886fe..5b7539c9 100644 --- a/api/main.go +++ b/api/main.go @@ -42,6 +42,5 @@ func main() { } container := di.NewContainer(os.Getenv("GCP_PROJECT_ID"), Version) - defer container.Close() container.Logger().Info(container.App().Listen(fmt.Sprintf("%s:%s", os.Getenv("APP_HOST"), os.Getenv("APP_PORT"))).Error()) } diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 14ea8e3c..d2179eb7 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -220,6 +220,11 @@ func (container *Container) App() (app *fiber.App) { []string{"/v1/events"}, )) + app.Hooks().OnPreShutdown(func() error { + container.RateLimitService().Close() + return nil + }) + container.app = app return app } From f15306ea687e944ddb657c35fcce0e9fb90f8d05 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:13:55 +0300 Subject: [PATCH 11/16] refactor: remove nil checks for logger in RateLimitService, use OnPreShutdown - Logger is always injected via DI, nil checks are unnecessary - Use Fiber's app.Hooks().OnPreShutdown() for graceful shutdown - Remove defer container.Close() from main.go Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/services/rate_limit_service.go | 104 +++++++++----------- api/pkg/services/rate_limit_service_test.go | 17 +++- 2 files changed, 61 insertions(+), 60 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index b9d12838..268e946a 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -66,15 +66,15 @@ func NewRateLimitService( // Increment adds cost to the user's counter and returns the current count. // If the count exceeds the plan's rate limit, exceeded is true. -func (svc *RateLimitService) Increment(ctx context.Context, userID entities.UserID, plan entities.SubscriptionName, cost int) (count int64, exceeded bool, err error) { - svc.mu.Lock() - defer svc.mu.Unlock() +func (service *RateLimitService) Increment(ctx context.Context, userID entities.UserID, plan entities.SubscriptionName, cost int) (count int64, exceeded bool, err error) { + service.mu.Lock() + defer service.mu.Unlock() key := string(userID) - counter, exists := svc.counters[key] + counter, exists := service.counters[key] if !exists { - counter, err = svc.hydrate(ctx, key) + counter, err = service.hydrate(ctx, key) if err != nil { counter = &userCounter{ count: 0, @@ -82,7 +82,7 @@ func (svc *RateLimitService) Increment(ctx context.Context, userID entities.User dirty: 0, } } - svc.counters[key] = counter + service.counters[key] = counter } // Reset if window has expired @@ -90,7 +90,7 @@ func (svc *RateLimitService) Increment(ctx context.Context, userID entities.User counter.count = 0 counter.dirty = 0 counter.windowExpiry = time.Now().Add(rateLimitWindow) - svc.notified[key] = false + service.notified[key] = false } counter.count += int64(cost) @@ -99,53 +99,45 @@ func (svc *RateLimitService) Increment(ctx context.Context, userID entities.User limit := plan.RateLimit() exceeded = counter.count > int64(limit) - if exceeded && !svc.notified[key] { - svc.notified[key] = true - go svc.emitExceededEvent(ctx, userID, counter.count, limit, plan) + if exceeded && !service.notified[key] { + service.notified[key] = true + go service.emitExceededEvent(ctx, userID, counter.count, limit, plan) } return counter.count, exceeded, nil } // Close flushes remaining dirty counters and stops the background goroutine. -func (svc *RateLimitService) Close() { - if svc.logger != nil { - svc.logger.Info("RateLimitService shutting down, flushing counters") - } - close(svc.done) - if svc.client != nil { - svc.flush(context.Background()) - } - if svc.logger != nil { - svc.logger.Info("RateLimitService shutdown complete") +func (service *RateLimitService) Close() { + service.logger.Info("RateLimitService shutting down, flushing counters") + close(service.done) + if service.client != nil { + service.flush(context.Background()) } + service.logger.Info("RateLimitService shutdown complete") } -func (svc *RateLimitService) flushLoop() { +func (service *RateLimitService) flushLoop() { ticker := time.NewTicker(rateLimitFlushInterval) defer ticker.Stop() - if svc.logger != nil { - svc.logger.Info(fmt.Sprintf("RateLimitService flush loop started (interval: %s)", rateLimitFlushInterval)) - } + service.logger.Info(fmt.Sprintf("RateLimitService flush loop started (interval: %s)", rateLimitFlushInterval)) for { select { case <-ticker.C: - if svc.client != nil { - svc.flush(context.Background()) - } - case <-svc.done: - if svc.logger != nil { - svc.logger.Info("RateLimitService flush loop stopped") + if service.client != nil { + service.flush(context.Background()) } + case <-service.done: + service.logger.Info("RateLimitService flush loop stopped") return } } } -func (svc *RateLimitService) flush(ctx context.Context) { - svc.mu.Lock() +func (service *RateLimitService) flush(ctx context.Context) { + service.mu.Lock() // Collect dirty entries type flushEntry struct { key string @@ -155,11 +147,11 @@ func (svc *RateLimitService) flush(ctx context.Context) { var entries []flushEntry now := time.Now() - for key, counter := range svc.counters { + for key, counter := range service.counters { // Clean up expired windows if now.After(counter.windowExpiry) { - delete(svc.counters, key) - delete(svc.notified, key) + delete(service.counters, key) + delete(service.notified, key) continue } if counter.dirty > 0 { @@ -171,26 +163,24 @@ func (svc *RateLimitService) flush(ctx context.Context) { counter.dirty = 0 } } - svc.mu.Unlock() + service.mu.Unlock() // Flush to Redis outside the lock with timeout redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) defer cancel() for _, entry := range entries { - pipe := svc.client.Pipeline() + pipe := service.client.Pipeline() pipe.IncrBy(redisCtx, entry.key, entry.delta) pipe.ExpireNX(redisCtx, entry.key, entry.ttl) if _, err := pipe.Exec(redisCtx); err != nil { - if svc.logger != nil { - svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) - } + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) } } } -func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userCounter, error) { - if svc.client == nil { +func (service *RateLimitService) hydrate(ctx context.Context, userID string) (*userCounter, error) { + if service.client == nil { return &userCounter{ count: 0, windowExpiry: time.Now().Add(rateLimitWindow), @@ -202,7 +192,7 @@ func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userC defer cancel() key := rateLimitKeyPrefix + userID - countStr, err := svc.client.Get(redisCtx, key).Result() + countStr, err := service.client.Get(redisCtx, key).Result() if err == redis.Nil { return &userCounter{ count: 0, @@ -219,15 +209,15 @@ func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userC return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID)) } - ttl, err := svc.client.TTL(redisCtx, key).Result() + ttl, err := service.client.TTL(redisCtx, key).Result() if err != nil || ttl <= 0 { ttl = rateLimitWindow } // Check if already notified (in Redis) notifiedKey := rateLimitNotifiedPrefix + userID - if exists, _ := svc.client.Exists(redisCtx, notifiedKey).Result(); exists > 0 { - svc.notified[userID] = true + if exists, _ := service.client.Exists(redisCtx, notifiedKey).Result(); exists > 0 { + service.notified[userID] = true } return &userCounter{ @@ -237,18 +227,18 @@ func (svc *RateLimitService) hydrate(ctx context.Context, userID string) (*userC }, nil } -func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entities.UserID, count int64, limit uint, plan entities.SubscriptionName) { - if svc.dispatcher == nil { +func (service *RateLimitService) emitExceededEvent(ctx context.Context, userID entities.UserID, count int64, limit uint, plan entities.SubscriptionName) { + if service.dispatcher == nil { return } // Set notified flag in Redis (24h TTL) to survive restarts - if svc.client != nil { + if service.client != nil { redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) defer cancel() notifiedKey := rateLimitNotifiedPrefix + string(userID) - if err := svc.client.Set(redisCtx, notifiedKey, "1", rateLimitWindow).Err(); err != nil && svc.logger != nil { - svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID))) + if err := service.client.Set(redisCtx, notifiedKey, "1", rateLimitWindow).Err(); err != nil { + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID))) } } @@ -260,17 +250,13 @@ func (svc *RateLimitService) emitExceededEvent(ctx context.Context, userID entit Timestamp: time.Now().UTC(), } - event, err := svc.createEvent(events.RateLimitExceeded, string(userID), payload) + event, err := service.createEvent(events.RateLimitExceeded, string(userID), payload) if err != nil { - if svc.logger != nil { - svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID))) - } + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID))) return } - if err = svc.dispatcher.Dispatch(ctx, event); err != nil { - if svc.logger != nil { - svc.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID))) - } + if err = service.dispatcher.Dispatch(ctx, event); err != nil { + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID))) } } diff --git a/api/pkg/services/rate_limit_service_test.go b/api/pkg/services/rate_limit_service_test.go index 96030a0d..29224a7f 100644 --- a/api/pkg/services/rate_limit_service_test.go +++ b/api/pkg/services/rate_limit_service_test.go @@ -6,8 +6,10 @@ import ( "time" "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" ) func TestRateLimitService_Increment_BasicCount(t *testing.T) { @@ -107,5 +109,18 @@ func TestRateLimitService_Increment_WindowExpiry(t *testing.T) { // suitable for unit tests that only test in-memory logic. func newTestRateLimitService(t *testing.T) *RateLimitService { t.Helper() - return NewRateLimitService(nil, nil, nil, nil) + return NewRateLimitService(nil, &testLogger{}, nil, nil) } + +type testLogger struct{} + +func (l *testLogger) Info(string) {} +func (l *testLogger) Error(error) {} +func (l *testLogger) Warn(error) {} +func (l *testLogger) Fatal(error) {} +func (l *testLogger) Trace(string) {} +func (l *testLogger) Debug(string) {} +func (l *testLogger) WithService(string) telemetry.Logger { return l } +func (l *testLogger) WithString(string, string) telemetry.Logger { return l } +func (l *testLogger) WithSpan(trace.SpanContext) telemetry.Logger { return l } +func (l *testLogger) Printf(string, ...interface{}) {} From 3f8b8d191822f193747b7e15c48af3ece42b4850 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:15:48 +0300 Subject: [PATCH 12/16] refactor: gate rate limiting at container level, skip middleware entirely when disabled Move RATE_LIMIT_ENABLED check to container.go so the middleware, service, and Redis client are never initialized when rate limiting is disabled. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 24 +++++++++++--------- api/pkg/middlewares/rate_limit_middleware.go | 13 ++--------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index d2179eb7..00e35d5f 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -212,18 +212,20 @@ func (container *Container) App() (app *fiber.App) { app.Use(middlewares.HTTPRequestLogger(container.Tracer(), container.Logger())) app.Use(middlewares.BearerAuth(container.Logger(), container.Tracer(), container.FirebaseAuthClient())) app.Use(middlewares.APIKeyAuth(container.Logger(), container.Tracer(), container.UserRepository())) - app.Use(middlewares.RateLimit( - container.Tracer(), - container.Logger(), - container.RateLimitService(), - container.UserRepository(), - []string{"/v1/events"}, - )) - app.Hooks().OnPreShutdown(func() error { - container.RateLimitService().Close() - return nil - }) + if os.Getenv("RATE_LIMIT_ENABLED") == "true" { + app.Use(middlewares.RateLimit( + container.Tracer(), + container.Logger(), + container.RateLimitService(), + container.UserRepository(), + []string{"/v1/events"}, + )) + app.Hooks().OnPreShutdown(func() error { + container.RateLimitService().Close() + return nil + }) + } container.app = app return app diff --git a/api/pkg/middlewares/rate_limit_middleware.go b/api/pkg/middlewares/rate_limit_middleware.go index 4c58dbb5..2551888b 100644 --- a/api/pkg/middlewares/rate_limit_middleware.go +++ b/api/pkg/middlewares/rate_limit_middleware.go @@ -1,7 +1,6 @@ package middlewares import ( - "os" "strconv" "strings" @@ -12,7 +11,7 @@ import ( "github.com/gofiber/fiber/v3" ) -const rateLimitCostCap = 200 +const rateLimitCostCap = 100 // RateLimit tracks per-user API request counts without blocking requests. func RateLimit( @@ -22,14 +21,9 @@ func RateLimit( userRepository repositories.UserRepository, excludePaths []string, ) fiber.Handler { - enabled := os.Getenv("RATE_LIMIT_ENABLED") == "true" logger = logger.WithService("middlewares.RateLimit") return func(c fiber.Ctx) error { - if !enabled { - return c.Next() - } - path := c.Path() for _, excluded := range excludePaths { if strings.HasPrefix(path, excluded) { @@ -49,10 +43,7 @@ func RateLimit( if c.Method() == fiber.MethodGet { if limitParam := c.Query("limit"); limitParam != "" { if parsed, err := strconv.Atoi(limitParam); err == nil && parsed > 0 { - cost = parsed - if cost > rateLimitCostCap { - cost = rateLimitCostCap - } + cost = min(parsed, rateLimitCostCap) } } } From f35d4cca2a41d8c20eedaaf5c066191c71073655 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:16:39 +0300 Subject: [PATCH 13/16] refactor: rename rate limiter service --- api/pkg/services/rate_limit_service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index 268e946a..7232f965 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -15,8 +15,8 @@ import ( ) const ( - rateLimitKeyPrefix = "rate_limit:" - rateLimitNotifiedPrefix = "rate_limit_notified:" + rateLimitKeyPrefix = "user_rate_limit:" + rateLimitNotifiedPrefix = "user_rate_limit_notified:" rateLimitWindow = 24 * time.Hour rateLimitFlushInterval = 30 * time.Second rateLimitRedisTimeout = 1 * time.Second @@ -50,7 +50,7 @@ func NewRateLimitService( client *redis.Client, dispatcher *EventDispatcher, ) *RateLimitService { - svc := &RateLimitService{ + rateLimiter := &RateLimitService{ tracer: tracer, logger: logger, client: client, @@ -60,8 +60,8 @@ func NewRateLimitService( done: make(chan struct{}), } - go svc.flushLoop() - return svc + go rateLimiter.flushLoop() + return rateLimiter } // Increment adds cost to the user's counter and returns the current count. From 6b388e51179ec1b975cc16c67386a1d5887170fb Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:20:59 +0300 Subject: [PATCH 14/16] perf: batch all Redis flush operations into a single pipeline call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of creating a separate pipeline per user, all INCRBY+ExpireNX commands are batched into one pipeline.Exec() — one round-trip to Redis regardless of how many users have dirty counters. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/services/rate_limit_service.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index 7232f965..6691906e 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -165,16 +165,18 @@ func (service *RateLimitService) flush(ctx context.Context) { } service.mu.Unlock() - // Flush to Redis outside the lock with timeout + // Flush to Redis outside the lock with timeout using a single pipeline batch redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) defer cancel() + pipe := service.client.Pipeline() for _, entry := range entries { - pipe := service.client.Pipeline() pipe.IncrBy(redisCtx, entry.key, entry.delta) pipe.ExpireNX(redisCtx, entry.key, entry.ttl) + } + if len(entries) > 0 { if _, err := pipe.Exec(redisCtx); err != nil { - service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit for key [%s]", entry.key))) + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit batch of %d entries", len(entries)))) } } } From 214163ef3d7ade42f89b607da9e71d545dc459b0 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:26:13 +0300 Subject: [PATCH 15/16] refactor(api): use ctxLogger with span context in RateLimitService Replace plain service.logger calls with ctxLogger (tracer-aware context logger) in flush, hydrate, and emitExceededEvent methods for trace correlation. Close/flushLoop keep plain logger since no request context. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/services/rate_limit_service.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index 6691906e..b6d992b7 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -166,7 +166,7 @@ func (service *RateLimitService) flush(ctx context.Context) { service.mu.Unlock() // Flush to Redis outside the lock with timeout using a single pipeline batch - redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) + redisCtx, cancel := context.WithTimeout(ctx, 10*rateLimitRedisTimeout) defer cancel() pipe := service.client.Pipeline() @@ -176,7 +176,7 @@ func (service *RateLimitService) flush(ctx context.Context) { } if len(entries) > 0 { if _, err := pipe.Exec(redisCtx); err != nil { - service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit batch of %d entries", len(entries)))) + service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot flush rate limit batch of [%d] entries", len(entries)))) } } } @@ -190,6 +190,9 @@ func (service *RateLimitService) hydrate(ctx context.Context, userID string) (*u }, nil } + ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger) + defer span.End() + redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) defer cancel() @@ -203,12 +206,12 @@ func (service *RateLimitService) hydrate(ctx context.Context, userID string) (*u }, nil } if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot hydrate rate limit for user [%s]", userID)) + return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot hydrate rate limit for user [%s]", userID))) } count, err := strconv.ParseInt(countStr, 10, 64) if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID)) + return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot parse rate limit count [%s] for user [%s]", countStr, userID))) } ttl, err := service.client.TTL(redisCtx, key).Result() @@ -222,6 +225,8 @@ func (service *RateLimitService) hydrate(ctx context.Context, userID string) (*u service.notified[userID] = true } + ctxLogger.Info(fmt.Sprintf("hydrated rate limit for user [%s] with count [%d]", userID, count)) + return &userCounter{ count: count, windowExpiry: time.Now().Add(ttl), @@ -234,13 +239,16 @@ func (service *RateLimitService) emitExceededEvent(ctx context.Context, userID e return } + ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger) + defer span.End() + // Set notified flag in Redis (24h TTL) to survive restarts if service.client != nil { redisCtx, cancel := context.WithTimeout(ctx, rateLimitRedisTimeout) defer cancel() notifiedKey := rateLimitNotifiedPrefix + string(userID) if err := service.client.Set(redisCtx, notifiedKey, "1", rateLimitWindow).Err(); err != nil { - service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID))) + ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot persist rate limit notified flag for user [%s]", userID)))) } } @@ -254,11 +262,11 @@ func (service *RateLimitService) emitExceededEvent(ctx context.Context, userID e event, err := service.createEvent(events.RateLimitExceeded, string(userID), payload) if err != nil { - service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID))) + ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot create rate limit exceeded event for user [%s]", userID)))) return } if err = service.dispatcher.Dispatch(ctx, event); err != nil { - service.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID))) + ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch rate limit exceeded event for user [%s]", userID)))) } } From 736697c830d3fb8bcd91479bed05b7c15343aa92 Mon Sep 17 00:00:00 2001 From: Acho Arnold Ewin Date: Tue, 30 Jun 2026 12:30:21 +0300 Subject: [PATCH 16/16] refactor: improve logging --- api/pkg/services/rate_limit_service.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/api/pkg/services/rate_limit_service.go b/api/pkg/services/rate_limit_service.go index b6d992b7..7d936160 100644 --- a/api/pkg/services/rate_limit_service.go +++ b/api/pkg/services/rate_limit_service.go @@ -49,10 +49,10 @@ func NewRateLimitService( logger telemetry.Logger, client *redis.Client, dispatcher *EventDispatcher, -) *RateLimitService { - rateLimiter := &RateLimitService{ +) (rateLimiter *RateLimitService) { + rateLimiter = &RateLimitService{ tracer: tracer, - logger: logger, + logger: logger.WithService(fmt.Sprintf("%T", rateLimiter)), client: client, dispatcher: dispatcher, counters: make(map[string]*userCounter), @@ -109,19 +109,19 @@ func (service *RateLimitService) Increment(ctx context.Context, userID entities. // Close flushes remaining dirty counters and stops the background goroutine. func (service *RateLimitService) Close() { - service.logger.Info("RateLimitService shutting down, flushing counters") + service.logger.Info("rate limit service shutting down, flushing counters") close(service.done) if service.client != nil { service.flush(context.Background()) } - service.logger.Info("RateLimitService shutdown complete") + service.logger.Info("rate limit service shutdown complete") } func (service *RateLimitService) flushLoop() { ticker := time.NewTicker(rateLimitFlushInterval) defer ticker.Stop() - service.logger.Info(fmt.Sprintf("RateLimitService flush loop started (interval: %s)", rateLimitFlushInterval)) + service.logger.Info(fmt.Sprintf("rate limit service flush loop started with interval [%s]", rateLimitFlushInterval)) for { select { @@ -130,7 +130,7 @@ func (service *RateLimitService) flushLoop() { service.flush(context.Background()) } case <-service.done: - service.logger.Info("RateLimitService flush loop stopped") + service.logger.Info("rate limit service flush loop stopped") return } }