Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion internal/middleware/idempotency.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ const (
idempotencyFingerprintTTL = 120 * time.Second
// idempotencyKeyMaxLen caps the client-supplied key. Stripe uses 255.
idempotencyKeyMaxLen = 255
// idempotencyInflightTTL bounds an in-flight reservation marker (BugBash
// 2026-06-02 #21). It must exceed the slowest synchronous handler — a
// real provision (provisionDB/Cache/NoSQL runs in-request) can take
// ~10-20s — so a concurrent retry is correctly told "in progress" rather
// than racing past a too-early-expired marker. It is also the worst-case
// self-heal window if the process dies mid-handler before the marker is
// overwritten or deleted.
idempotencyInflightTTL = 60 * time.Second

// X-Idempotency-Source values.
idempotencySourceExplicit = "explicit"
Expand All @@ -157,6 +165,16 @@ type idemEntry struct {
Body []byte `json:"b"`
ContentType string `json:"c"`
BodyHash string `json:"h"` // sha256 hex of the original request body
// InFlight marks a reservation placeholder written (via SETNX) the
// moment a cache-miss request begins running the handler, BEFORE the
// real response exists. A concurrent same-key request that reads this
// marker must NOT re-run the handler — it returns 409
// idempotency_key_in_progress instead. The marker is overwritten by the
// real entry when the handler completes (or deleted if the response is
// non-cacheable), and self-expires after idempotencyInflightTTL if the
// process dies mid-handler. Closes the check-then-act double-create race
// (BugBash 2026-06-02 #21).
InFlight bool `json:"f,omitempty"`
}

// mutableErrorCodes lists machine-readable `error` strings whose 4xx
Expand Down Expand Up @@ -324,6 +342,39 @@ func Idempotency(rdb *redis.Client, endpoint string) fiber.Handler {
}
}

// respondIdempotencyInProgress returns the 409 a request gets when it finds an
// in-flight reservation marker for its key (BugBash 2026-06-02 #21). The
// envelope mirrors the idempotency_key_conflict 409 (ok/error/message/
// request_id/retry_after_seconds/agent_action) but is RETRYABLE: unlike a
// body-mismatch conflict, the right move is to wait and retry the SAME key —
// the original request's response will be replayed once it completes.
func respondIdempotencyInProgress(c *fiber.Ctx) error {
return c.Status(fiber.StatusConflict).JSON(fiber.Map{
"ok": false,
"error": "idempotency_key_in_progress",
"message": "A request with this Idempotency-Key is still being processed.",
"request_id": GetRequestID(c),
"retry_after_seconds": 2,
"agent_action": "Another request with the same Idempotency-Key is still in flight. Do NOT mint a new key — wait ~2s and retry the SAME key; the original response will be replayed once it completes. See https://instanode.dev/docs/idempotency.",
})
}

// reserveInflight best-effort places an in-flight marker at cacheKey via SETNX,
// so a concurrent same-key request that arrives after this point reads the
// marker and returns 409 in_progress instead of double-running the handler
// (BugBash 2026-06-02 #21). Fire-and-forget: a SETNX error or a lost sub-
// millisecond race just means the marker isn't placed, and the request falls
// back to the documented best-effort dedup posture (run the handler; the GET
// hit-block still catches the common case where the other request reserved
// first). The marker is overwritten by the real entry on completion, or
// cleared on a non-cacheable response.
func reserveInflight(ctx context.Context, rdb *redis.Client, cacheKey, reqBodyHash string) {
// Marshal of this fixed-shape struct cannot fail; the marker is best-effort
// regardless, so we ignore the error and let SETNX no-op on a nil payload.
marker, _ := json.Marshal(idemEntry{InFlight: true, BodyHash: reqBodyHash})
_ = rdb.SetNX(ctx, cacheKey, marker, idempotencyInflightTTL).Err()
}

// idempotencyExplicit handles the Stripe-shape Idempotency-Key path.
// Extracted from the main Idempotency wrapper so the fingerprint-fallback
// branch can live alongside it without nesting another layer of if-blocks.
Expand Down Expand Up @@ -371,6 +422,13 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe
slog.Warn("idempotency.cache_unmarshal_failed",
"error", jerr, "endpoint", endpoint)
} else {
if entry.InFlight {
// A concurrent request with this key is still running (it
// placed this reservation marker). Don't run the handler a
// second time (BugBash #21) — tell the caller to retry the
// same key shortly. No rate-limit refund: nothing was served.
return respondIdempotencyInProgress(c)
}
if entry.BodyHash != reqBodyHash {
// 409 is a genuine error response, not a replay — DO NOT
// refund the rate-limit counter here. The agent did the
Expand Down Expand Up @@ -407,13 +465,24 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe
}
}

// MISS — reserve the key (SETNX in-flight marker) before running the
// handler so a concurrent same-key request reads the marker and 409s
// instead of double-creating (BugBash #21). The real entry overwrites
// the marker below; non-cacheable paths delete it.
reserveInflight(ctx, rdb, cacheKey, reqBodyHash)

nextErr := c.Next()
if nextErr != nil && !IsResponseWrittenErr(nextErr) {
// Handler failed before writing a response — clear the reservation so
// an immediate retry isn't told "in progress" for the marker's TTL.
rdb.Del(context.Background(), cacheKey)
return nextErr
}

status := c.Response().StatusCode()
if status >= 500 {
// 5xx is retryable — never leave the marker stranding the retry.
rdb.Del(context.Background(), cacheKey)
return nextErr
}

Expand All @@ -426,6 +495,9 @@ func idempotencyExplicit(c *fiber.Ctx, rdb *redis.Client, endpoint, scope, rawKe
// the agent's 24h Idempotency-Key. Success + stable failures still
// cache as before.
if !shouldCacheResponse(status, body, ct) {
// Clear the reservation: a mutable 4xx is meant to be retried once
// the user resolves it, so the marker must not block that retry.
rdb.Del(context.Background(), cacheKey)
slog.Info("idempotency.skip_cache_mutable_error",
"endpoint", endpoint,
"status", status,
Expand Down Expand Up @@ -516,6 +588,12 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str
slog.Warn("idempotency.fingerprint_cache_unmarshal_failed",
"error", jerr, "endpoint", endpoint)
// Corrupt — fall through to handler and overwrite below.
} else if entry.InFlight {
// A concurrent same-fingerprint request is still running (BugBash
// #21). Return 409 in_progress rather than double-running the
// handler. The source header still reports the fingerprint path.
c.Set(idempotencySourceHeader, idempotencySourceFingerprint)
return respondIdempotencyInProgress(c)
} else {
// Cache HIT on the body-fingerprint fallback path. Same
// refund semantics as the explicit-key branch: the
Expand All @@ -532,15 +610,19 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str
}
}

// Miss — run the handler, then cache the response (non-5xx only).
// Miss — reserve the key (BugBash #21), run the handler, then cache the
// response (non-5xx only).
c.Set(idempotencySourceHeader, idempotencySourceMiss)
reserveInflight(ctx, rdb, cacheKey, sha256Hex(canonBody))
nextErr := c.Next()
if nextErr != nil && !IsResponseWrittenErr(nextErr) {
rdb.Del(context.Background(), cacheKey)
return nextErr
}

status := c.Response().StatusCode()
if status >= 500 {
rdb.Del(context.Background(), cacheKey)
return nextErr
}

Expand All @@ -553,6 +635,7 @@ func idempotencyFingerprint(c *fiber.Ctx, rdb *redis.Client, endpoint, scope str
// path is the no-header default, so the silent strand is even more
// likely here.
if !shouldCacheResponse(status, body, ct) {
rdb.Del(context.Background(), cacheKey)
slog.Info("idempotency.fingerprint_skip_cache_mutable_error",
"endpoint", endpoint,
"status", status,
Expand Down
125 changes: 125 additions & 0 deletions internal/middleware/idempotency_inflight_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package middleware_test

// idempotency_inflight_test.go — BugBash 2026-06-02 #21 regression.
//
// The middleware used to do GET(miss) → run handler → SET, with no atomic
// reservation between the GET and the SET. Two requests carrying the same
// Idempotency-Key (or the same body-fingerprint) that raced in that window
// both saw redis.Nil and both ran the handler — double-creating real
// backend resources for the authenticated provision paths that have no other
// per-burst dedup gate.
//
// The fix writes an in-flight reservation marker (SETNX) the instant a miss
// begins running the handler. A concurrent same-key request reads the marker
// and returns 409 idempotency_key_in_progress instead of re-running the
// handler. These tests hold request A inside the handler (blocked on a
// channel) so request B is guaranteed to observe A's live reservation.

import (
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"

"github.com/gofiber/fiber/v2"
"github.com/stretchr/testify/assert"

"instant.dev/internal/middleware"
"instant.dev/internal/testhelpers"
)

// newBlockingInflightApp builds a Fiber app whose POST /test handler signals
// `entered` once it begins and then blocks on `release`, so a test can hold
// one request in flight. ran counts handler invocations.
func newBlockingInflightApp(t *testing.T, ran *int32, entered, release chan struct{}) (*fiber.App, func()) {
t.Helper()
rdb, cleanup := testhelpers.SetupTestRedis(t)
app := fiber.New()
app.Use(middleware.Fingerprint())
app.Post("/test", middleware.Idempotency(rdb, "inflight.endpoint"), func(c *fiber.Ctx) error {
atomic.AddInt32(ran, 1)
entered <- struct{}{}
<-release
return c.Status(fiber.StatusCreated).JSON(fiber.Map{"ok": true})
})
return app, cleanup
}

// fireBlocking sends a request that will block in the handler, returning its
// response on the done channel once released.
func fireBlocking(app *fiber.App, ip, key, body string, done chan *http.Response) {
req := httptest.NewRequest(http.MethodPost, "/test", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Forwarded-For", ip)
if key != "" {
req.Header.Set("Idempotency-Key", key)
}
resp, _ := app.Test(req, 10000)
done <- resp
}

// TestIdempotency_ExplicitKey_ConcurrentInFlight_Returns409 — a second request
// carrying the same Idempotency-Key while the first is still running the
// handler gets 409 idempotency_key_in_progress and does NOT re-run the handler.
func TestIdempotency_ExplicitKey_ConcurrentInFlight_Returns409(t *testing.T) {
var ran int32
entered := make(chan struct{})
release := make(chan struct{})
app, clean := newBlockingInflightApp(t, &ran, entered, release)
defer clean()

ip := uniqueTestIP("inflight-explicit")
key := "inflight-key-" + ip
body := `{"x":1}`

aDone := make(chan *http.Response, 1)
go fireBlocking(app, ip, key, body, aDone)
<-entered // A is in the handler; its reservation marker is live in Redis.

// B: same key, arrives mid-flight → 409 in_progress, handler not re-run.
respB := postWithIdem(t, app, "/test", ip, key, body)
assert.Equal(t, http.StatusConflict, respB.StatusCode)
bodyB := readBody(t, respB)
assert.Contains(t, bodyB, "idempotency_key_in_progress")
assert.Contains(t, bodyB, "request_id")

close(release) // let A finish.
respA := <-aDone
assert.Equal(t, http.StatusCreated, respA.StatusCode)
respA.Body.Close()
assert.Equal(t, int32(1), atomic.LoadInt32(&ran),
"handler must run exactly once despite the concurrent same-key duplicate")
}

// TestIdempotency_Fingerprint_ConcurrentInFlight_Returns409 — the same race on
// the no-header body-fingerprint path: a concurrent identical-body request
// observes the in-flight marker and 409s instead of double-running.
func TestIdempotency_Fingerprint_ConcurrentInFlight_Returns409(t *testing.T) {
var ran int32
entered := make(chan struct{})
release := make(chan struct{})
app, clean := newBlockingInflightApp(t, &ran, entered, release)
defer clean()

ip := uniqueTestIP("inflight-fp")
body := `{"x":1}`

aDone := make(chan *http.Response, 1)
go fireBlocking(app, ip, "", body, aDone) // no key → fingerprint path
<-entered

// B: no key, identical body + ip + route → same fingerprint → 409.
respB := postWithIdem(t, app, "/test", ip, "", body)
assert.Equal(t, http.StatusConflict, respB.StatusCode)
bodyB := readBody(t, respB)
assert.Contains(t, bodyB, "idempotency_key_in_progress")
assert.Equal(t, "fingerprint", respB.Header.Get("X-Idempotency-Source"))

close(release)
respA := <-aDone
assert.Equal(t, http.StatusCreated, respA.StatusCode)
respA.Body.Close()
assert.Equal(t, int32(1), atomic.LoadInt32(&ran),
"handler must run exactly once despite the concurrent same-fingerprint duplicate")
}
Loading