diff --git a/internal/cache/pinned.go b/internal/cache/pinned.go new file mode 100644 index 0000000..54a349c --- /dev/null +++ b/internal/cache/pinned.go @@ -0,0 +1,39 @@ +package cache + +import ( + "context" + "io" + "net/http" + + "github.com/alecthomas/errors" +) + +// ErrPinStale is returned when the pinned object revision no longer exists +// (e.g. the snapshot was regenerated mid-download). Callers should re-pin and +// restart the transfer. +var ErrPinStale = errors.New("pinned object revision is stale") + +// PinnedObject describes an immutable revision of a cached object. The opaque +// Pin token lets a client fetch byte ranges that all resolve to the same +// revision, so parallel ranges stitch correctly even across cachew replicas. +type PinnedObject struct { + // Pin is an opaque token identifying the object revision. It is echoed back + // to the client and supplied on subsequent range requests. + Pin string + // Size is the total object size in bytes. + Size int64 + // Headers are the stored object headers (Content-Type, Last-Modified, etc.). + Headers http.Header +} + +// PinnedRangeCache is implemented by caches that can serve a byte range from a +// specific immutable object revision. It backs parallel snapshot downloads: +// the client pins once, then fetches ranges concurrently against that revision. +type PinnedRangeCache interface { + // Pin stats the object and returns an opaque revision token plus size and + // headers. Returns os.ErrNotExist if the object is absent. + Pin(ctx context.Context, key Key) (PinnedObject, error) + // OpenPinnedRange returns a reader for bytes [start, end] of the revision + // identified by pin. Returns ErrPinStale if the revision is gone. + OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) +} diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 6042e2a..0c7ce94 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -243,10 +243,14 @@ func (r *s3Reader) Read(p []byte) (int, error) { if err == nil || errors.Is(err, io.EOF) { return n, err //nolint:wrapcheck } - // Convert NoSuchKey to os.ErrNotExist for consistency errResponse := minio.ToErrorResponse(err) - if errResponse.Code == s3ErrNoSuchKey { + switch { + case errResponse.Code == s3ErrNoSuchKey: + // Convert NoSuchKey to os.ErrNotExist for consistency. return n, os.ErrNotExist + case errResponse.StatusCode == http.StatusPreconditionFailed || errResponse.Code == "PreconditionFailed": + // A pinned (If-Match ETag) range whose object was overwritten. + return n, ErrPinStale } return n, errors.WithStack(err) } diff --git a/internal/cache/s3_pinned.go b/internal/cache/s3_pinned.go new file mode 100644 index 0000000..7f4e6c9 --- /dev/null +++ b/internal/cache/s3_pinned.go @@ -0,0 +1,60 @@ +package cache + +import ( + "context" + "io" + "strconv" + "strings" + + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" +) + +// pinETagPrefix tags ETag-based pin tokens. Keeping the token prefixed lets us +// add a "version:" form later (S3 VersionId) without changing the wire format. +const pinETagPrefix = "etag:" + +var _ PinnedRangeCache = (*S3)(nil) + +// Pin returns an opaque revision token (the S3 ETag) for the object, so that +// subsequent range reads can be pinned to this exact revision. +func (s *S3) Pin(ctx context.Context, key Key) (PinnedObject, error) { + objInfo, headers, err := s.statAndHeaders(ctx, key) + if err != nil { + return PinnedObject{}, err + } + headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10)) + return PinnedObject{ + Pin: pinETagPrefix + objInfo.ETag, + Size: objInfo.Size, + Headers: headers, + }, nil +} + +// OpenPinnedRange serves bytes [start, end] from the revision identified by pin. +// The ETag pin fails closed: if the object was overwritten the request returns +// ErrPinStale rather than mixing old and new bytes. +func (s *S3) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) { + etag, ok := strings.CutPrefix(pin, pinETagPrefix) + if !ok { + return nil, errors.Errorf("unsupported pin token %q", pin) + } + + objectName := s.keyToPath(s.namespace, key) + opts := minio.GetObjectOptions{} + if err := opts.SetRange(start, end); err != nil { + return nil, errors.Errorf("set range %d-%d: %w", start, end, err) + } + if err := opts.SetMatchETag(etag); err != nil { + return nil, errors.Errorf("set etag %s: %w", etag, err) + } + + // GetObject is lazy and must not be Stat()'d here: calling Stat() after + // SetRange makes minio re-fetch the whole object. Errors (including a 412 + // for a stale ETag pin) surface on Read and are mapped by s3Reader.Read. + obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, opts) + if err != nil { + return nil, errors.Errorf("get pinned range: %w", err) + } + return &s3Reader{obj: obj}, nil +} diff --git a/internal/cache/s3_pinned_test.go b/internal/cache/s3_pinned_test.go new file mode 100644 index 0000000..608ffb0 --- /dev/null +++ b/internal/cache/s3_pinned_test.go @@ -0,0 +1,78 @@ +package cache_test + +import ( + "bytes" + "crypto/rand" + "io" + "testing" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/s3client/s3clienttest" +) + +// TestS3PinnedRange verifies that an object can be pinned and reassembled +// byte-for-byte from independent ranged reads, and that overwriting the object +// invalidates the old pin (fail-closed) rather than mixing revisions. +func TestS3PinnedRange(t *testing.T) { + bucket := s3clienttest.Start(t) + c := newS3Cache(t, bucket) + defer c.Close() + + pc, ok := c.(cache.PinnedRangeCache) + assert.True(t, ok, "S3 cache must implement PinnedRangeCache") + + ctx := t.Context() + key := cache.NewKey("pinned-object") + + // 10 MiB of random data so multiple distinct ranges are exercised. + data := make([]byte, 10<<20) + _, err := rand.Read(data) + assert.NoError(t, err) + + w, err := c.Create(ctx, key, nil, 0) + assert.NoError(t, err) + _, err = w.Write(data) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + pin, err := pc.Pin(ctx, key) + assert.NoError(t, err) + assert.Equal(t, int64(len(data)), pin.Size) + assert.NotZero(t, pin.Pin) + + // Reassemble from four ranges in arbitrary order. + chunk := int64(len(data)) / 4 + bounds := [][2]int64{ + {2 * chunk, 3*chunk - 1}, + {0, chunk - 1}, + {3 * chunk, int64(len(data)) - 1}, + {chunk, 2*chunk - 1}, + } + got := make([]byte, len(data)) + for _, b := range bounds { + r, err := pc.OpenPinnedRange(ctx, key, pin.Pin, b[0], b[1]) + assert.NoError(t, err) + part, err := io.ReadAll(r) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + assert.Equal(t, b[1]-b[0]+1, int64(len(part))) + copy(got[b[0]:], part) + } + assert.True(t, bytes.Equal(data, got), "stitched ranges must match original") + + // Overwrite the object: the old pin must now fail closed. + w2, err := c.Create(ctx, key, nil, 0) + assert.NoError(t, err) + _, err = w2.Write(make([]byte, len(data))) + assert.NoError(t, err) + assert.NoError(t, w2.Close()) + + // The pin fails closed: the ETag mismatch surfaces on read. + stale, err := pc.OpenPinnedRange(ctx, key, pin.Pin, 0, chunk-1) + assert.NoError(t, err) + _, err = io.ReadAll(stale) + assert.NoError(t, stale.Close()) + assert.IsError(t, err, cache.ErrPinStale) +} diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 65982c6..5ba52a0 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -340,3 +340,36 @@ func (t Tiered) ListNamespaces(ctx context.Context) ([]string, error) { sort.Strings(namespaces) return namespaces, nil } + +var _ PinnedRangeCache = (*Tiered)(nil) + +// Pin delegates to the first tier that supports pinned range serving (the S3 +// tier). The per-pod disk tier is deliberately skipped: only the shared, +// authoritative object can be pinned consistently across replicas. +func (t Tiered) Pin(ctx context.Context, key Key) (PinnedObject, error) { + pc := t.pinnedTier() + if pc == nil { + return PinnedObject{}, errors.New("no pinnable cache tier configured") + } + obj, err := pc.Pin(ctx, key) + return obj, errors.WithStack(err) +} + +// OpenPinnedRange delegates to the pinnable tier (S3), bypassing disk. +func (t Tiered) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) { + pc := t.pinnedTier() + if pc == nil { + return nil, errors.New("no pinnable cache tier configured") + } + rc, err := pc.OpenPinnedRange(ctx, key, pin, start, end) + return rc, errors.WithStack(err) +} + +func (t Tiered) pinnedTier() PinnedRangeCache { + for _, c := range t.caches { + if pc, ok := c.(PinnedRangeCache); ok { + return pc + } + } + return nil +} diff --git a/internal/strategy/git/parse_range_test.go b/internal/strategy/git/parse_range_test.go new file mode 100644 index 0000000..c23d839 --- /dev/null +++ b/internal/strategy/git/parse_range_test.go @@ -0,0 +1,68 @@ +package git //nolint:testpackage // white-box test for unexported range helpers + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" +) + +func TestParseSingleByteRange(t *testing.T) { + tests := []struct { + header string + start, end int64 + ok bool + }{ + {"bytes=0-99", 0, 99, true}, + {"bytes=100-199", 100, 199, true}, + {"bytes=5-5", 5, 5, true}, + {"", 0, 0, false}, + {"bytes=0-", 0, 0, false}, // open-ended rejected + {"bytes=-100", 0, 0, false}, // suffix rejected + {"bytes=0-99,200-299", 0, 0, false}, // multi-range rejected + {"bytes=99-0", 0, 0, false}, // end < start rejected + {"items=0-99", 0, 0, false}, // wrong unit + {"bytes=a-b", 0, 0, false}, // non-numeric + } + for _, tt := range tests { + start, end, ok := parseSingleByteRange(tt.header) + assert.Equal(t, tt.ok, ok, "ok for %q", tt.header) + if tt.ok { + assert.Equal(t, tt.start, start, "start for %q", tt.header) + assert.Equal(t, tt.end, end, "end for %q", tt.header) + } + } +} + +// failPinCache fails the test if the pinned-range handler reaches the cache, +// proving the oversized-range guard rejects the request before any read. +type failPinCache struct{ t *testing.T } + +func (p failPinCache) Pin(context.Context, cache.Key) (cache.PinnedObject, error) { + p.t.Fatal("Pin must not be called") + return cache.PinnedObject{}, nil //nolint:nilnil // unreachable; t.Fatal above +} + +func (p failPinCache) OpenPinnedRange(context.Context, cache.Key, string, int64, int64) (io.ReadCloser, error) { + p.t.Fatal("OpenPinnedRange must not be called for an oversized range") + return nil, nil //nolint:nilnil // unreachable; t.Fatal above +} + +func TestServePinRangeRejectsOversizedRange(t *testing.T) { + s := &Strategy{} + req := httptest.NewRequest(http.MethodGet, "/git/x/snapshot.tar.zst", nil) + // Length = maxPinRangeBytes + 1, one byte over the cap. + req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", int64(maxPinRangeBytes))) + w := httptest.NewRecorder() + + s.servePinRange(w, req, failPinCache{t}, cache.NewKey("k"), "repo", "etag:x", time.Now()) + + assert.Equal(t, http.StatusRequestedRangeNotSatisfiable, w.Code) +} diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 28b1389..9482ad6 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -248,6 +249,15 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, cacheKey := snapshotCacheKey(upstreamURL) + // Version-pinned parallel download path. When the client supplies a pin + // header it is either probing for a revision token or fetching a byte range + // pinned to one. Both are served straight from the shared S3 revision, + // bypassing the per-pod disk tier and cold-start/freshness logic below. + if pin := r.Header.Get(snapshotPinHeader); pin != "" { + s.servePinnedSnapshot(w, r, cacheKey, repoName, pin, start) + return + } + // On cold start the local mirror may not be ready yet. Check the S3 cache // first so we can stream a cached snapshot to the client immediately while // the mirror restores in the background. This avoids blocking the client @@ -336,6 +346,143 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } } +const ( + // snapshotPinHeader carries either the probe sentinel (to request a pin + // token) or a previously issued opaque pin token (on range requests). + snapshotPinHeader = "X-Cachew-Snapshot-Pin" + // snapshotSizeHeader reports the total artifact size on a probe response and + // is echoed by the client on range requests so cachew can fill Content-Range + // without an extra stat per chunk. + snapshotSizeHeader = "X-Cachew-Snapshot-Size" + // snapshotPinStaleHeader signals the pinned revision is gone; re-probe. + snapshotPinStaleHeader = "X-Cachew-Snapshot-Pin-Stale" + pinProbeValue = "probe" + + // maxPinRangeBytes caps a single pinned range. A range is read fully into + // memory (io.ReadAll) before serving so a stale pin can fail cleanly, so an + // uncapped client-controlled range could exhaust pod memory. Clients must + // request chunks no larger than this. + maxPinRangeBytes = 256 << 20 // 256 MiB +) + +// servePinnedSnapshot implements version-pinned range serving for parallel +// snapshot downloads. The client first probes to obtain an opaque pin token + +// size, then issues concurrent Range GETs carrying that token. Every range +// resolves to the same immutable S3 revision, so chunks stitch correctly no +// matter which cachew replica handles each request. A regenerated snapshot +// fails closed with 412 so the client re-probes rather than mixing revisions. +func (s *Strategy) servePinnedSnapshot(w http.ResponseWriter, r *http.Request, cacheKey cache.Key, repoName, pin string, start time.Time) { + pc, ok := s.cache.(cache.PinnedRangeCache) + if !ok { + http.Error(w, "pinned range serving unavailable", http.StatusNotImplemented) + return + } + if pin == pinProbeValue { + s.servePinProbe(w, r, pc, cacheKey, repoName, start) + return + } + s.servePinRange(w, r, pc, cacheKey, repoName, pin, start) +} + +// servePinProbe returns the pin token + size for the current S3 revision. +func (s *Strategy) servePinProbe(w http.ResponseWriter, r *http.Request, pc cache.PinnedRangeCache, cacheKey cache.Key, repoName string, start time.Time) { + ctx := r.Context() + obj, err := pc.Pin(ctx, cacheKey) + switch { + case errors.Is(err, os.ErrNotExist): + // Not in the shared tier yet; client falls back to the normal stream, + // which warms S3 for a subsequent pinned download. + http.Error(w, "snapshot not pinnable", http.StatusConflict) + return + case err != nil: + logging.FromContext(ctx).ErrorContext(ctx, "Failed to pin snapshot", "repo", repoName, "error", err) + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + for k, vs := range obj.Headers { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.Header().Set(snapshotPinHeader, obj.Pin) + w.Header().Set(snapshotSizeHeader, strconv.FormatInt(obj.Size, 10)) + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusOK) + s.metrics.recordSnapshotServe(ctx, "pin_probe", repoName, 0, time.Since(start)) +} + +// servePinRange serves one byte range pinned to a prior probe's revision token. +func (s *Strategy) servePinRange(w http.ResponseWriter, r *http.Request, pc cache.PinnedRangeCache, cacheKey cache.Key, repoName, pin string, start time.Time) { + ctx := r.Context() + startByte, endByte, ok := parseSingleByteRange(r.Header.Get("Range")) + if !ok { + http.Error(w, "pinned request requires a single bytes=a-b range", http.StatusBadRequest) + return + } + if endByte-startByte+1 > maxPinRangeBytes { + http.Error(w, "pinned range exceeds maximum chunk size", http.StatusRequestedRangeNotSatisfiable) + return + } + + reader, err := pc.OpenPinnedRange(ctx, cacheKey, pin, startByte, endByte) + if err == nil { + // The range is bounded by the client's chunk size, so buffering it lets + // a stale-pin error surface before any 206 status is written. + defer reader.Close() + var buf []byte + if buf, err = io.ReadAll(reader); err == nil { + total := r.Header.Get(snapshotSizeHeader) + if total == "" { + total = "*" + } + w.Header().Set(snapshotPinHeader, pin) + w.Header().Set("Content-Type", "application/zstd") + w.Header().Set("Content-Length", strconv.Itoa(len(buf))) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startByte, endByte, total)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusPartialContent) + if _, werr := w.Write(buf); werr != nil { + logging.FromContext(ctx).WarnContext(ctx, "Failed to write pinned range", "repo", repoName, "error", werr) + } + s.metrics.recordSnapshotServe(ctx, "pin_range", repoName, int64(len(buf)), time.Since(start)) + return + } + } + switch { + case errors.Is(err, cache.ErrPinStale): + w.Header().Set(snapshotPinStaleHeader, "true") + http.Error(w, "pinned revision is stale", http.StatusPreconditionFailed) + case errors.Is(err, os.ErrNotExist): + http.Error(w, "snapshot not found", http.StatusNotFound) + default: + logging.FromContext(ctx).ErrorContext(ctx, "Failed to serve pinned range", "repo", repoName, "error", err) + http.Error(w, "internal server error", http.StatusInternalServerError) + } +} + +// parseSingleByteRange parses a closed "bytes=a-b" range header. Open-ended or +// multi-range forms are rejected; pinned clients always request closed ranges. +func parseSingleByteRange(header string) (start, end int64, ok bool) { + spec, found := strings.CutPrefix(header, "bytes=") + if !found || strings.Contains(spec, ",") { + return 0, 0, false + } + lo, hi, found := strings.Cut(spec, "-") + if !found || lo == "" || hi == "" { + return 0, 0, false + } + start, err := strconv.ParseInt(lo, 10, 64) + if err != nil { + return 0, 0, false + } + end, err = strconv.ParseInt(hi, 10, 64) + if err != nil || end < start { + return 0, 0, false + } + return start, end, true +} + func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error { for key, values := range headers { for _, value := range values {