From f2699f4607d53f40bee4b3bdfd9f1f9540d2ae5f Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Fri, 12 Jun 2026 10:12:11 -0700 Subject: [PATCH] feat(git): add version-pinned range serving for parallel snapshot downloads Adds a probe/range protocol so a client can download a large snapshot artifact via concurrent ranged GETs that all resolve to the same immutable S3 revision. A naive client-side parallel download is unsafe because the snapshot is mutable and multiple replicas can serve divergent copies, so ranges could stitch across revisions and corrupt the artifact. The client first probes (X-Cachew-Snapshot-Pin: probe) to obtain an opaque pin token (the S3 ETag) plus total size, then issues parallel Range GETs carrying that token. Each range is served straight from the pinned S3 revision via an If-Match GET, bypassing the per-pod disk tier, so chunks stitch correctly regardless of which replica handles each request. A regenerated snapshot fails closed with 412 so the client re-probes and restarts rather than mixing revisions. The pin token is prefixed (etag:) so it can later carry an S3 VersionId without a wire-format change. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019eae3d-a2fd-70ca-80fe-a7536ec6748c --- internal/cache/pinned.go | 39 ++++++ internal/cache/s3.go | 8 +- internal/cache/s3_pinned.go | 60 +++++++++ internal/cache/s3_pinned_test.go | 78 ++++++++++++ internal/cache/tiered.go | 33 +++++ internal/strategy/git/parse_range_test.go | 68 ++++++++++ internal/strategy/git/snapshot.go | 147 ++++++++++++++++++++++ 7 files changed, 431 insertions(+), 2 deletions(-) create mode 100644 internal/cache/pinned.go create mode 100644 internal/cache/s3_pinned.go create mode 100644 internal/cache/s3_pinned_test.go create mode 100644 internal/strategy/git/parse_range_test.go diff --git a/internal/cache/pinned.go b/internal/cache/pinned.go new file mode 100644 index 0000000..54a349c --- /dev/null +++ b/internal/cache/pinned.go @@ -0,0 +1,39 @@ +package cache + +import ( + "context" + "io" + "net/http" + + "github.com/alecthomas/errors" +) + +// ErrPinStale is returned when the pinned object revision no longer exists +// (e.g. the snapshot was regenerated mid-download). Callers should re-pin and +// restart the transfer. +var ErrPinStale = errors.New("pinned object revision is stale") + +// PinnedObject describes an immutable revision of a cached object. The opaque +// Pin token lets a client fetch byte ranges that all resolve to the same +// revision, so parallel ranges stitch correctly even across cachew replicas. +type PinnedObject struct { + // Pin is an opaque token identifying the object revision. It is echoed back + // to the client and supplied on subsequent range requests. + Pin string + // Size is the total object size in bytes. + Size int64 + // Headers are the stored object headers (Content-Type, Last-Modified, etc.). + Headers http.Header +} + +// PinnedRangeCache is implemented by caches that can serve a byte range from a +// specific immutable object revision. It backs parallel snapshot downloads: +// the client pins once, then fetches ranges concurrently against that revision. +type PinnedRangeCache interface { + // Pin stats the object and returns an opaque revision token plus size and + // headers. Returns os.ErrNotExist if the object is absent. + Pin(ctx context.Context, key Key) (PinnedObject, error) + // OpenPinnedRange returns a reader for bytes [start, end] of the revision + // identified by pin. Returns ErrPinStale if the revision is gone. + OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) +} diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 6042e2a..0c7ce94 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -243,10 +243,14 @@ func (r *s3Reader) Read(p []byte) (int, error) { if err == nil || errors.Is(err, io.EOF) { return n, err //nolint:wrapcheck } - // Convert NoSuchKey to os.ErrNotExist for consistency errResponse := minio.ToErrorResponse(err) - if errResponse.Code == s3ErrNoSuchKey { + switch { + case errResponse.Code == s3ErrNoSuchKey: + // Convert NoSuchKey to os.ErrNotExist for consistency. return n, os.ErrNotExist + case errResponse.StatusCode == http.StatusPreconditionFailed || errResponse.Code == "PreconditionFailed": + // A pinned (If-Match ETag) range whose object was overwritten. + return n, ErrPinStale } return n, errors.WithStack(err) } diff --git a/internal/cache/s3_pinned.go b/internal/cache/s3_pinned.go new file mode 100644 index 0000000..7f4e6c9 --- /dev/null +++ b/internal/cache/s3_pinned.go @@ -0,0 +1,60 @@ +package cache + +import ( + "context" + "io" + "strconv" + "strings" + + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" +) + +// pinETagPrefix tags ETag-based pin tokens. Keeping the token prefixed lets us +// add a "version:" form later (S3 VersionId) without changing the wire format. +const pinETagPrefix = "etag:" + +var _ PinnedRangeCache = (*S3)(nil) + +// Pin returns an opaque revision token (the S3 ETag) for the object, so that +// subsequent range reads can be pinned to this exact revision. +func (s *S3) Pin(ctx context.Context, key Key) (PinnedObject, error) { + objInfo, headers, err := s.statAndHeaders(ctx, key) + if err != nil { + return PinnedObject{}, err + } + headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10)) + return PinnedObject{ + Pin: pinETagPrefix + objInfo.ETag, + Size: objInfo.Size, + Headers: headers, + }, nil +} + +// OpenPinnedRange serves bytes [start, end] from the revision identified by pin. +// The ETag pin fails closed: if the object was overwritten the request returns +// ErrPinStale rather than mixing old and new bytes. +func (s *S3) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) { + etag, ok := strings.CutPrefix(pin, pinETagPrefix) + if !ok { + return nil, errors.Errorf("unsupported pin token %q", pin) + } + + objectName := s.keyToPath(s.namespace, key) + opts := minio.GetObjectOptions{} + if err := opts.SetRange(start, end); err != nil { + return nil, errors.Errorf("set range %d-%d: %w", start, end, err) + } + if err := opts.SetMatchETag(etag); err != nil { + return nil, errors.Errorf("set etag %s: %w", etag, err) + } + + // GetObject is lazy and must not be Stat()'d here: calling Stat() after + // SetRange makes minio re-fetch the whole object. Errors (including a 412 + // for a stale ETag pin) surface on Read and are mapped by s3Reader.Read. + obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, opts) + if err != nil { + return nil, errors.Errorf("get pinned range: %w", err) + } + return &s3Reader{obj: obj}, nil +} diff --git a/internal/cache/s3_pinned_test.go b/internal/cache/s3_pinned_test.go new file mode 100644 index 0000000..608ffb0 --- /dev/null +++ b/internal/cache/s3_pinned_test.go @@ -0,0 +1,78 @@ +package cache_test + +import ( + "bytes" + "crypto/rand" + "io" + "testing" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/s3client/s3clienttest" +) + +// TestS3PinnedRange verifies that an object can be pinned and reassembled +// byte-for-byte from independent ranged reads, and that overwriting the object +// invalidates the old pin (fail-closed) rather than mixing revisions. +func TestS3PinnedRange(t *testing.T) { + bucket := s3clienttest.Start(t) + c := newS3Cache(t, bucket) + defer c.Close() + + pc, ok := c.(cache.PinnedRangeCache) + assert.True(t, ok, "S3 cache must implement PinnedRangeCache") + + ctx := t.Context() + key := cache.NewKey("pinned-object") + + // 10 MiB of random data so multiple distinct ranges are exercised. + data := make([]byte, 10<<20) + _, err := rand.Read(data) + assert.NoError(t, err) + + w, err := c.Create(ctx, key, nil, 0) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + pin, err := pc.Pin(ctx, key) + assert.NoError(t, err) + assert.Equal(t, int64(len(data)), pin.Size) + assert.NotZero(t, pin.Pin) + + // Reassemble from four ranges in arbitrary order. + chunk := int64(len(data)) / 4 + bounds := [][2]int64{ + {2 * chunk, 3*chunk - 1}, + {0, chunk - 1}, + {3 * chunk, int64(len(data)) - 1}, + {chunk, 2*chunk - 1}, + } + got := make([]byte, len(data)) + for _, b := range bounds { + r, err := pc.OpenPinnedRange(ctx, key, pin.Pin, b[0], b[1]) + assert.NoError(t, err) + part, err := io.ReadAll(r) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + assert.Equal(t, b[1]-b[0]+1, int64(len(part))) + copy(got[b[0]:], part) + } + assert.True(t, bytes.Equal(data, got), "stitched ranges must match original") + + // Overwrite the object: the old pin must now fail closed. + w2, err := c.Create(ctx, key, nil, 0) + assert.NoError(t, err) + _, err = w2.Write(make([]byte, len(data))) + assert.NoError(t, err) + assert.NoError(t, w2.Close()) + + // The pin fails closed: the ETag mismatch surfaces on read. + stale, err := pc.OpenPinnedRange(ctx, key, pin.Pin, 0, chunk-1) + assert.NoError(t, err) + _, err = io.ReadAll(stale) + assert.NoError(t, stale.Close()) + assert.IsError(t, err, cache.ErrPinStale) +} diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 65982c6..5ba52a0 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -340,3 +340,36 @@ func (t Tiered) ListNamespaces(ctx context.Context) ([]string, error) { sort.Strings(namespaces) return namespaces, nil } + +var _ PinnedRangeCache = (*Tiered)(nil) + +// Pin delegates to the first tier that supports pinned range serving (the S3 +// tier). The per-pod disk tier is deliberately skipped: only the shared, +// authoritative object can be pinned consistently across replicas. +func (t Tiered) Pin(ctx context.Context, key Key) (PinnedObject, error) { + pc := t.pinnedTier() + if pc == nil { + return PinnedObject{}, errors.New("no pinnable cache tier configured") + } + obj, err := pc.Pin(ctx, key) + return obj, errors.WithStack(err) +} + +// OpenPinnedRange delegates to the pinnable tier (S3), bypassing disk. +func (t Tiered) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) { + pc := t.pinnedTier() + if pc == nil { + return nil, errors.New("no pinnable cache tier configured") + } + rc, err := pc.OpenPinnedRange(ctx, key, pin, start, end) + return rc, errors.WithStack(err) +} + +func (t Tiered) pinnedTier() PinnedRangeCache { + for _, c := range t.caches { + if pc, ok := c.(PinnedRangeCache); ok { + return pc + } + } + return nil +} diff --git a/internal/strategy/git/parse_range_test.go b/internal/strategy/git/parse_range_test.go new file mode 100644 index 0000000..c23d839 --- /dev/null +++ b/internal/strategy/git/parse_range_test.go @@ -0,0 +1,68 @@ +package git //nolint:testpackage // white-box test for unexported range helpers + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" +) + +func TestParseSingleByteRange(t *testing.T) { + tests := []struct { + header string + start, end int64 + ok bool + }{ + {"bytes=0-99", 0, 99, true}, + {"bytes=100-199", 100, 199, true}, + {"bytes=5-5", 5, 5, true}, + {"", 0, 0, false}, + {"bytes=0-", 0, 0, false}, // open-ended rejected + {"bytes=-100", 0, 0, false}, // suffix rejected + {"bytes=0-99,200-299", 0, 0, false}, // multi-range rejected + {"bytes=99-0", 0, 0, false}, // end < start rejected + {"items=0-99", 0, 0, false}, // wrong unit + {"bytes=a-b", 0, 0, false}, // non-numeric + } + for _, tt := range tests { + start, end, ok := parseSingleByteRange(tt.header) + assert.Equal(t, tt.ok, ok, "ok for %q", tt.header) + if tt.ok { + assert.Equal(t, tt.start, start, "start for %q", tt.header) + assert.Equal(t, tt.end, end, "end for %q", tt.header) + } + } +} + +// failPinCache fails the test if the pinned-range handler reaches the cache, +// proving the oversized-range guard rejects the request before any read. +type failPinCache struct{ t *testing.T } + +func (p failPinCache) Pin(context.Context, cache.Key) (cache.PinnedObject, error) { + p.t.Fatal("Pin must not be called") + return cache.PinnedObject{}, nil //nolint:nilnil // unreachable; t.Fatal above +} + +func (p failPinCache) OpenPinnedRange(context.Context, cache.Key, string, int64, int64) (io.ReadCloser, error) { + p.t.Fatal("OpenPinnedRange must not be called for an oversized range") + return nil, nil //nolint:nilnil // unreachable; t.Fatal above +} + +func TestServePinRangeRejectsOversizedRange(t *testing.T) { + s := &Strategy{} + req := httptest.NewRequest(http.MethodGet, "/git/x/snapshot.tar.zst", nil) + // Length = maxPinRangeBytes + 1, one byte over the cap. + req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", int64(maxPinRangeBytes))) + w := httptest.NewRecorder() + + s.servePinRange(w, req, failPinCache{t}, cache.NewKey("k"), "repo", "etag:x", time.Now()) + + assert.Equal(t, http.StatusRequestedRangeNotSatisfiable, w.Code) +} diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 28b1389..9482ad6 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -248,6 +249,15 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, cacheKey := snapshotCacheKey(upstreamURL) + // Version-pinned parallel download path. When the client supplies a pin + // header it is either probing for a revision token or fetching a byte range + // pinned to one. Both are served straight from the shared S3 revision, + // bypassing the per-pod disk tier and cold-start/freshness logic below. + if pin := r.Header.Get(snapshotPinHeader); pin != "" { + s.servePinnedSnapshot(w, r, cacheKey, repoName, pin, start) + return + } + // On cold start the local mirror may not be ready yet. Check the S3 cache // first so we can stream a cached snapshot to the client immediately while // the mirror restores in the background. This avoids blocking the client @@ -336,6 +346,143 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } } +const ( + // snapshotPinHeader carries either the probe sentinel (to request a pin + // token) or a previously issued opaque pin token (on range requests). + snapshotPinHeader = "X-Cachew-Snapshot-Pin" + // snapshotSizeHeader reports the total artifact size on a probe response and + // is echoed by the client on range requests so cachew can fill Content-Range + // without an extra stat per chunk. + snapshotSizeHeader = "X-Cachew-Snapshot-Size" + // snapshotPinStaleHeader signals the pinned revision is gone; re-probe. + snapshotPinStaleHeader = "X-Cachew-Snapshot-Pin-Stale" + pinProbeValue = "probe" + + // maxPinRangeBytes caps a single pinned range. A range is read fully into + // memory (io.ReadAll) before serving so a stale pin can fail cleanly, so an + // uncapped client-controlled range could exhaust pod memory. Clients must + // request chunks no larger than this. + maxPinRangeBytes = 256 << 20 // 256 MiB +) + +// servePinnedSnapshot implements version-pinned range serving for parallel +// snapshot downloads. The client first probes to obtain an opaque pin token + +// size, then issues concurrent Range GETs carrying that token. Every range +// resolves to the same immutable S3 revision, so chunks stitch correctly no +// matter which cachew replica handles each request. A regenerated snapshot +// fails closed with 412 so the client re-probes rather than mixing revisions. +func (s *Strategy) servePinnedSnapshot(w http.ResponseWriter, r *http.Request, cacheKey cache.Key, repoName, pin string, start time.Time) { + pc, ok := s.cache.(cache.PinnedRangeCache) + if !ok { + http.Error(w, "pinned range serving unavailable", http.StatusNotImplemented) + return + } + if pin == pinProbeValue { + s.servePinProbe(w, r, pc, cacheKey, repoName, start) + return + } + s.servePinRange(w, r, pc, cacheKey, repoName, pin, start) +} + +// servePinProbe returns the pin token + size for the current S3 revision. +func (s *Strategy) servePinProbe(w http.ResponseWriter, r *http.Request, pc cache.PinnedRangeCache, cacheKey cache.Key, repoName string, start time.Time) { + ctx := r.Context() + obj, err := pc.Pin(ctx, cacheKey) + switch { + case errors.Is(err, os.ErrNotExist): + // Not in the shared tier yet; client falls back to the normal stream, + // which warms S3 for a subsequent pinned download. + http.Error(w, "snapshot not pinnable", http.StatusConflict) + return + case err != nil: + logging.FromContext(ctx).ErrorContext(ctx, "Failed to pin snapshot", "repo", repoName, "error", err) + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + for k, vs := range obj.Headers { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.Header().Set(snapshotPinHeader, obj.Pin) + w.Header().Set(snapshotSizeHeader, strconv.FormatInt(obj.Size, 10)) + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusOK) + s.metrics.recordSnapshotServe(ctx, "pin_probe", repoName, 0, time.Since(start)) +} + +// servePinRange serves one byte range pinned to a prior probe's revision token. +func (s *Strategy) servePinRange(w http.ResponseWriter, r *http.Request, pc cache.PinnedRangeCache, cacheKey cache.Key, repoName, pin string, start time.Time) { + ctx := r.Context() + startByte, endByte, ok := parseSingleByteRange(r.Header.Get("Range")) + if !ok { + http.Error(w, "pinned request requires a single bytes=a-b range", http.StatusBadRequest) + return + } + if endByte-startByte+1 > maxPinRangeBytes { + http.Error(w, "pinned range exceeds maximum chunk size", http.StatusRequestedRangeNotSatisfiable) + return + } + + reader, err := pc.OpenPinnedRange(ctx, cacheKey, pin, startByte, endByte) + if err == nil { + // The range is bounded by the client's chunk size, so buffering it lets + // a stale-pin error surface before any 206 status is written. + defer reader.Close() + var buf []byte + if buf, err = io.ReadAll(reader); err == nil { + total := r.Header.Get(snapshotSizeHeader) + if total == "" { + total = "*" + } + w.Header().Set(snapshotPinHeader, pin) + w.Header().Set("Content-Type", "application/zstd") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startByte, endByte, total)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusPartialContent) + if _, werr := w.Write(buf); werr != nil { + logging.FromContext(ctx).WarnContext(ctx, "Failed to write pinned range", "repo", repoName, "error", werr) + } + s.metrics.recordSnapshotServe(ctx, "pin_range", repoName, int64(len(buf)), time.Since(start)) + return + } + } + switch { + case errors.Is(err, cache.ErrPinStale): + w.Header().Set(snapshotPinStaleHeader, "true") + http.Error(w, "pinned revision is stale", http.StatusPreconditionFailed) + case errors.Is(err, os.ErrNotExist): + http.Error(w, "snapshot not found", http.StatusNotFound) + default: + logging.FromContext(ctx).ErrorContext(ctx, "Failed to serve pinned range", "repo", repoName, "error", err) + http.Error(w, "internal server error", http.StatusInternalServerError) + } +} + +// parseSingleByteRange parses a closed "bytes=a-b" range header. Open-ended or +// multi-range forms are rejected; pinned clients always request closed ranges. +func parseSingleByteRange(header string) (start, end int64, ok bool) { + spec, found := strings.CutPrefix(header, "bytes=") + if !found || strings.Contains(spec, ",") { + return 0, 0, false + } + lo, hi, found := strings.Cut(spec, "-") + if !found || lo == "" || hi == "" { + return 0, 0, false + } + start, err := strconv.ParseInt(lo, 10, 64) + if err != nil { + return 0, 0, false + } + end, err = strconv.ParseInt(hi, 10, 64) + if err != nil || end < start { + return 0, 0, false + } + return start, end, true +} + func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error { for key, values := range headers { for _, value := range values {