diff --git a/internal/middleware/idempotency.go b/internal/middleware/idempotency.go index ee96d5e2..c281fe4b 100644 --- a/internal/middleware/idempotency.go +++ b/internal/middleware/idempotency.go @@ -142,6 +142,14 @@ const ( idempotencyFingerprintTTL = 120 * time.Second // idempotencyKeyMaxLen caps the client-supplied key. Stripe uses 255. idempotencyKeyMaxLen = 255 + // idempotencyInflightTTL bounds an in-flight reservation marker (BugBash + // 2026-06-02 #21). It must exceed the slowest synchronous handler — a + // real provision (provisionDB/Cache/NoSQL runs in-request) can take + // ~10-20s — so a concurrent retry is correctly told "in progress" rather + // than racing past a too-early-expired marker. It is also the worst-case + // self-heal window if the process dies mid-handler before the marker is + // overwritten or deleted. + idempotencyInflightTTL = 60 * time.Second // X-Idempotency-Source values. idempotencySourceExplicit = "explicit" @@ -157,6 +165,16 @@ type idemEntry struct { Body []byte `json:"b"` ContentType string `json:"c"` BodyHash string `json:"h"` // sha256 hex of the original request body + // InFlight marks a reservation placeholder written (via SETNX) the + // moment a cache-miss request begins running the handler, BEFORE the + // real response exists. A concurrent same-key request that reads this + // marker must NOT re-run the handler — it returns 409 + // idempotency_key_in_progress instead. The marker is overwritten by the + // real entry when the handler completes (or deleted if the response is + // non-cacheable), and self-expires after idempotencyInflightTTL if the + // process dies mid-handler. Closes the check-then-act double-create race + // (BugBash 2026-06-02 #21). + InFlight bool `json:"f,omitempty"` } // mutableErrorCodes lists machine-readable `error` strings whose 4xx @@ -324,6 +342,39 @@ func Idempotency(rdb *redis.Client, endpoint string) fiber.Handler { } } +// respondIdempotencyInProgress returns the 409 a request gets when it finds an +// in-flight reservation marker for its key (BugBash 2026-06-02 #21). The +// envelope mirrors the idempotency_key_conflict 409 (ok/error/message/ +// request_id/retry_after_seconds/agent_action) but is RETRYABLE: unlike a +// body-mismatch conflict, the right move is to wait and retry the SAME key — +// the original request's response will be replayed once it completes. +func respondIdempotencyInProgress(c *fiber.Ctx) error { + return c.Status(fiber.StatusConflict).JSON(fiber.Map{ + "ok": false, + "error": "idempotency_key_in_progress", + "message": "A request with this Idempotency-Key is still being processed.", + "request_id": GetRequestID(c), + "retry_after_seconds": 2, + "agent_action": "Another request with the same Idempotency-Key is still in flight. Do NOT mint a new key — wait ~2s and retry the SAME key; the original response will be replayed once it completes. See https://instanode.dev/docs/idempotency.", + }) +} + +// reserveInflight best-effort places an in-flight marker at cacheKey via SETNX, +// so a concurrent same-key request that arrives after this point reads the +// marker and returns 409 in_progress instead of double-running the handler +// (BugBash 2026-06-02 #21). Fire-and-forget: a SETNX error or a lost sub- +// millisecond race just means the marker isn't placed, and the request falls +// back to the documented best-effort dedup posture (run the handler; the GET +// hit-block still catches the common case where the other request reserved +// first). The marker is overwritten by the real entry on completion, or +// cleared on a non-cacheable response. +func reserveInflight(ctx context.Context, rdb *redis.Client, cacheKey, reqBodyHash string) { + // Marshal of this fixed-shape struct cannot fail; the marker is best-effort + // regardless, so we ignore the error and let SETNX no-op on a nil payload. + marker, _ := json.Marshal(idemEntry{InFlight: true, BodyHash: reqBodyHash}) + _ = rdb.SetNX(ctx, cacheKey, marker, idempotencyInflightTTL).Err() +} + // idempotencyExplicit handles the Stripe-shape Idempotency-Key path. // Extracted from the main Idempotency wrapper so the fingerprint-fallback // branch can live alongside it without nesting another layer of if-blocks. @@ -371,6 +422,13 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe slog.Warn("idempotency.cache_unmarshal_failed", "error", jerr, "endpoint", endpoint) } else { + if entry.InFlight { + // A concurrent request with this key is still running (it + // placed this reservation marker). Don't run the handler a + // second time (BugBash #21) — tell the caller to retry the + // same key shortly. No rate-limit refund: nothing was served. + return respondIdempotencyInProgress(c) + } if entry.BodyHash != reqBodyHash { // 409 is a genuine error response, not a replay — DO NOT // refund the rate-limit counter here. The agent did the @@ -407,13 +465,24 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe } } + // MISS — reserve the key (SETNX in-flight marker) before running the + // handler so a concurrent same-key request reads the marker and 409s + // instead of double-creating (BugBash #21). The real entry overwrites + // the marker below; non-cacheable paths delete it. + reserveInflight(ctx, rdb, cacheKey, reqBodyHash) + nextErr := c.Next() if nextErr != nil && !IsResponseWrittenErr(nextErr) { + // Handler failed before writing a response — clear the reservation so + // an immediate retry isn't told "in progress" for the marker's TTL. + rdb.Del(context.Background(), cacheKey) return nextErr } status := c.Response().StatusCode() if status >= 500 { + // 5xx is retryable — never leave the marker stranding the retry. + rdb.Del(context.Background(), cacheKey) return nextErr } @@ -426,6 +495,9 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe // the agent's 24h Idempotency-Key. Success + stable failures still // cache as before. if !shouldCacheResponse(status, body, ct) { + // Clear the reservation: a mutable 4xx is meant to be retried once + // the user resolves it, so the marker must not block that retry. + rdb.Del(context.Background(), cacheKey) slog.Info("idempotency.skip_cache_mutable_error", "endpoint", endpoint, "status", status, @@ -516,6 +588,12 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str slog.Warn("idempotency.fingerprint_cache_unmarshal_failed", "error", jerr, "endpoint", endpoint) // Corrupt — fall through to handler and overwrite below. + } else if entry.InFlight { + // A concurrent same-fingerprint request is still running (BugBash + // #21). Return 409 in_progress rather than double-running the + // handler. The source header still reports the fingerprint path. + c.Set(idempotencySourceHeader, idempotencySourceFingerprint) + return respondIdempotencyInProgress(c) } else { // Cache HIT on the body-fingerprint fallback path. Same // refund semantics as the explicit-key branch: the @@ -532,15 +610,19 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str } } - // Miss — run the handler, then cache the response (non-5xx only). + // Miss — reserve the key (BugBash #21), run the handler, then cache the + // response (non-5xx only). c.Set(idempotencySourceHeader, idempotencySourceMiss) + reserveInflight(ctx, rdb, cacheKey, sha256Hex(canonBody)) nextErr := c.Next() if nextErr != nil && !IsResponseWrittenErr(nextErr) { + rdb.Del(context.Background(), cacheKey) return nextErr } status := c.Response().StatusCode() if status >= 500 { + rdb.Del(context.Background(), cacheKey) return nextErr } @@ -553,6 +635,7 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str // path is the no-header default, so the silent strand is even more // likely here. if !shouldCacheResponse(status, body, ct) { + rdb.Del(context.Background(), cacheKey) slog.Info("idempotency.fingerprint_skip_cache_mutable_error", "endpoint", endpoint, "status", status, diff --git a/internal/middleware/idempotency_inflight_test.go b/internal/middleware/idempotency_inflight_test.go new file mode 100644 index 00000000..dd8f3cd2 --- /dev/null +++ b/internal/middleware/idempotency_inflight_test.go @@ -0,0 +1,125 @@ +package middleware_test + +// idempotency_inflight_test.go — BugBash 2026-06-02 #21 regression. +// +// The middleware used to do GET(miss) → run handler → SET, with no atomic +// reservation between the GET and the SET. Two requests carrying the same +// Idempotency-Key (or the same body-fingerprint) that raced in that window +// both saw redis.Nil and both ran the handler — double-creating real +// backend resources for the authenticated provision paths that have no other +// per-burst dedup gate. +// +// The fix writes an in-flight reservation marker (SETNX) the instant a miss +// begins running the handler. A concurrent same-key request reads the marker +// and returns 409 idempotency_key_in_progress instead of re-running the +// handler. These tests hold request A inside the handler (blocked on a +// channel) so request B is guaranteed to observe A's live reservation. + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + + "github.com/gofiber/fiber/v2" + "github.com/stretchr/testify/assert" + + "instant.dev/internal/middleware" + "instant.dev/internal/testhelpers" +) + +// newBlockingInflightApp builds a Fiber app whose POST /test handler signals +// `entered` once it begins and then blocks on `release`, so a test can hold +// one request in flight. ran counts handler invocations. +func newBlockingInflightApp(t *testing.T, ran *int32, entered, release chan struct{}) (*fiber.App, func()) { + t.Helper() + rdb, cleanup := testhelpers.SetupTestRedis(t) + app := fiber.New() + app.Use(middleware.Fingerprint()) + app.Post("/test", middleware.Idempotency(rdb, "inflight.endpoint"), func(c *fiber.Ctx) error { + atomic.AddInt32(ran, 1) + entered <- struct{}{} + <-release + return c.Status(fiber.StatusCreated).JSON(fiber.Map{"ok": true}) + }) + return app, cleanup +} + +// fireBlocking sends a request that will block in the handler, returning its +// response on the done channel once released. +func fireBlocking(app *fiber.App, ip, key, body string, done chan *http.Response) { + req := httptest.NewRequest(http.MethodPost, "/test", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Forwarded-For", ip) + if key != "" { + req.Header.Set("Idempotency-Key", key) + } + resp, _ := app.Test(req, 10000) + done <- resp +} + +// TestIdempotency_ExplicitKey_ConcurrentInFlight_Returns409 — a second request +// carrying the same Idempotency-Key while the first is still running the +// handler gets 409 idempotency_key_in_progress and does NOT re-run the handler. +func TestIdempotency_ExplicitKey_ConcurrentInFlight_Returns409(t *testing.T) { + var ran int32 + entered := make(chan struct{}) + release := make(chan struct{}) + app, clean := newBlockingInflightApp(t, &ran, entered, release) + defer clean() + + ip := uniqueTestIP("inflight-explicit") + key := "inflight-key-" + ip + body := `{"x":1}` + + aDone := make(chan *http.Response, 1) + go fireBlocking(app, ip, key, body, aDone) + <-entered // A is in the handler; its reservation marker is live in Redis. + + // B: same key, arrives mid-flight → 409 in_progress, handler not re-run. + respB := postWithIdem(t, app, "/test", ip, key, body) + assert.Equal(t, http.StatusConflict, respB.StatusCode) + bodyB := readBody(t, respB) + assert.Contains(t, bodyB, "idempotency_key_in_progress") + assert.Contains(t, bodyB, "request_id") + + close(release) // let A finish. + respA := <-aDone + assert.Equal(t, http.StatusCreated, respA.StatusCode) + respA.Body.Close() + assert.Equal(t, int32(1), atomic.LoadInt32(&ran), + "handler must run exactly once despite the concurrent same-key duplicate") +} + +// TestIdempotency_Fingerprint_ConcurrentInFlight_Returns409 — the same race on +// the no-header body-fingerprint path: a concurrent identical-body request +// observes the in-flight marker and 409s instead of double-running. +func TestIdempotency_Fingerprint_ConcurrentInFlight_Returns409(t *testing.T) { + var ran int32 + entered := make(chan struct{}) + release := make(chan struct{}) + app, clean := newBlockingInflightApp(t, &ran, entered, release) + defer clean() + + ip := uniqueTestIP("inflight-fp") + body := `{"x":1}` + + aDone := make(chan *http.Response, 1) + go fireBlocking(app, ip, "", body, aDone) // no key → fingerprint path + <-entered + + // B: no key, identical body + ip + route → same fingerprint → 409. + respB := postWithIdem(t, app, "/test", ip, "", body) + assert.Equal(t, http.StatusConflict, respB.StatusCode) + bodyB := readBody(t, respB) + assert.Contains(t, bodyB, "idempotency_key_in_progress") + assert.Equal(t, "fingerprint", respB.Header.Get("X-Idempotency-Source")) + + close(release) + respA := <-aDone + assert.Equal(t, http.StatusCreated, respA.StatusCode) + respA.Body.Close() + assert.Equal(t, int32(1), atomic.LoadInt32(&ran), + "handler must run exactly once despite the concurrent same-fingerprint duplicate") +}