Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions internal/cache/pinned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cache

import (
"context"
"io"
"net/http"

"github.com/alecthomas/errors"
)

// ErrPinStale is returned when the pinned object revision no longer exists
// (e.g. the snapshot was regenerated mid-download). Callers should re-pin and
// restart the transfer.
var ErrPinStale = errors.New("pinned object revision is stale")

// PinnedObject describes an immutable revision of a cached object. The opaque
// Pin token lets a client fetch byte ranges that all resolve to the same
// revision, so parallel ranges stitch correctly even across cachew replicas.
type PinnedObject struct {
// Pin is an opaque token identifying the object revision. It is echoed back
// to the client and supplied on subsequent range requests.
Pin string
// Size is the total object size in bytes.
Size int64
// Headers are the stored object headers (Content-Type, Last-Modified, etc.).
Headers http.Header
}

// PinnedRangeCache is implemented by caches that can serve a byte range from a
// specific immutable object revision. It backs parallel snapshot downloads:
// the client pins once, then fetches ranges concurrently against that revision.
type PinnedRangeCache interface {
// Pin stats the object and returns an opaque revision token plus size and
// headers. Returns os.ErrNotExist if the object is absent.
Pin(ctx context.Context, key Key) (PinnedObject, error)
// OpenPinnedRange returns a reader for bytes [start, end] of the revision
// identified by pin. Returns ErrPinStale if the revision is gone.
OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error)
}
8 changes: 6 additions & 2 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ func (r *s3Reader) Read(p []byte) (int, error) {
if err == nil || errors.Is(err, io.EOF) {
return n, err //nolint:wrapcheck
}
// Convert NoSuchKey to os.ErrNotExist for consistency
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == s3ErrNoSuchKey {
switch {
case errResponse.Code == s3ErrNoSuchKey:
// Convert NoSuchKey to os.ErrNotExist for consistency.
return n, os.ErrNotExist
case errResponse.StatusCode == http.StatusPreconditionFailed || errResponse.Code == "PreconditionFailed":
// A pinned (If-Match ETag) range whose object was overwritten.
return n, ErrPinStale
}
return n, errors.WithStack(err)
}
Expand Down
60 changes: 60 additions & 0 deletions internal/cache/s3_pinned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cache

import (
"context"
"io"
"strconv"
"strings"

"github.com/alecthomas/errors"
"github.com/minio/minio-go/v7"
)

// pinETagPrefix tags ETag-based pin tokens. Keeping the token prefixed lets us
// add a "version:" form later (S3 VersionId) without changing the wire format.
const pinETagPrefix = "etag:"

var _ PinnedRangeCache = (*S3)(nil)

// Pin returns an opaque revision token (the S3 ETag) for the object, so that
// subsequent range reads can be pinned to this exact revision.
func (s *S3) Pin(ctx context.Context, key Key) (PinnedObject, error) {
objInfo, headers, err := s.statAndHeaders(ctx, key)
if err != nil {
return PinnedObject{}, err
}
headers.Set("Content-Length", strconv.FormatInt(objInfo.Size, 10))
return PinnedObject{
Pin: pinETagPrefix + objInfo.ETag,
Size: objInfo.Size,
Headers: headers,
}, nil
}

// OpenPinnedRange serves bytes [start, end] from the revision identified by pin.
// The ETag pin fails closed: if the object was overwritten the request returns
// ErrPinStale rather than mixing old and new bytes.
func (s *S3) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) {
etag, ok := strings.CutPrefix(pin, pinETagPrefix)
if !ok {
return nil, errors.Errorf("unsupported pin token %q", pin)
}

objectName := s.keyToPath(s.namespace, key)
opts := minio.GetObjectOptions{}
if err := opts.SetRange(start, end); err != nil {
return nil, errors.Errorf("set range %d-%d: %w", start, end, err)
}
if err := opts.SetMatchETag(etag); err != nil {
return nil, errors.Errorf("set etag %s: %w", etag, err)
}

// GetObject is lazy and must not be Stat()'d here: calling Stat() after
// SetRange makes minio re-fetch the whole object. Errors (including a 412
// for a stale ETag pin) surface on Read and are mapped by s3Reader.Read.
obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, opts)
if err != nil {
return nil, errors.Errorf("get pinned range: %w", err)
}
return &s3Reader{obj: obj}, nil
}
78 changes: 78 additions & 0 deletions internal/cache/s3_pinned_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cache_test

import (
"bytes"
"crypto/rand"
"io"
"testing"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/s3client/s3clienttest"
)

// TestS3PinnedRange verifies that an object can be pinned and reassembled
// byte-for-byte from independent ranged reads, and that overwriting the object
// invalidates the old pin (fail-closed) rather than mixing revisions.
func TestS3PinnedRange(t *testing.T) {
bucket := s3clienttest.Start(t)
c := newS3Cache(t, bucket)
defer c.Close()

pc, ok := c.(cache.PinnedRangeCache)
assert.True(t, ok, "S3 cache must implement PinnedRangeCache")

ctx := t.Context()
key := cache.NewKey("pinned-object")

// 10 MiB of random data so multiple distinct ranges are exercised.
data := make([]byte, 10<<20)
_, err := rand.Read(data)
assert.NoError(t, err)

w, err := c.Create(ctx, key, nil, 0)
assert.NoError(t, err)
_, err = w.Write(data)
assert.NoError(t, err)
assert.NoError(t, w.Close())

pin, err := pc.Pin(ctx, key)
assert.NoError(t, err)
assert.Equal(t, int64(len(data)), pin.Size)
assert.NotZero(t, pin.Pin)

// Reassemble from four ranges in arbitrary order.
chunk := int64(len(data)) / 4
bounds := [][2]int64{
{2 * chunk, 3*chunk - 1},
{0, chunk - 1},
{3 * chunk, int64(len(data)) - 1},
{chunk, 2*chunk - 1},
}
got := make([]byte, len(data))
for _, b := range bounds {
r, err := pc.OpenPinnedRange(ctx, key, pin.Pin, b[0], b[1])
assert.NoError(t, err)
part, err := io.ReadAll(r)
assert.NoError(t, err)
assert.NoError(t, r.Close())
assert.Equal(t, b[1]-b[0]+1, int64(len(part)))
copy(got[b[0]:], part)
}
assert.True(t, bytes.Equal(data, got), "stitched ranges must match original")

// Overwrite the object: the old pin must now fail closed.
w2, err := c.Create(ctx, key, nil, 0)
assert.NoError(t, err)
_, err = w2.Write(make([]byte, len(data)))
assert.NoError(t, err)
assert.NoError(t, w2.Close())

// The pin fails closed: the ETag mismatch surfaces on read.
stale, err := pc.OpenPinnedRange(ctx, key, pin.Pin, 0, chunk-1)
assert.NoError(t, err)
_, err = io.ReadAll(stale)
assert.NoError(t, stale.Close())
assert.IsError(t, err, cache.ErrPinStale)
}
33 changes: 33 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,36 @@ func (t Tiered) ListNamespaces(ctx context.Context) ([]string, error) {
sort.Strings(namespaces)
return namespaces, nil
}

var _ PinnedRangeCache = (*Tiered)(nil)

// Pin delegates to the first tier that supports pinned range serving (the S3
// tier). The per-pod disk tier is deliberately skipped: only the shared,
// authoritative object can be pinned consistently across replicas.
func (t Tiered) Pin(ctx context.Context, key Key) (PinnedObject, error) {
pc := t.pinnedTier()
if pc == nil {
return PinnedObject{}, errors.New("no pinnable cache tier configured")
}
obj, err := pc.Pin(ctx, key)
return obj, errors.WithStack(err)
}

// OpenPinnedRange delegates to the pinnable tier (S3), bypassing disk.
func (t Tiered) OpenPinnedRange(ctx context.Context, key Key, pin string, start, end int64) (io.ReadCloser, error) {
pc := t.pinnedTier()
if pc == nil {
return nil, errors.New("no pinnable cache tier configured")
}
rc, err := pc.OpenPinnedRange(ctx, key, pin, start, end)
return rc, errors.WithStack(err)
}

func (t Tiered) pinnedTier() PinnedRangeCache {
for _, c := range t.caches {
if pc, ok := c.(PinnedRangeCache); ok {
return pc
}
}
return nil
}
68 changes: 68 additions & 0 deletions internal/strategy/git/parse_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package git //nolint:testpackage // white-box test for unexported range helpers

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
)

func TestParseSingleByteRange(t *testing.T) {
tests := []struct {
header string
start, end int64
ok bool
}{
{"bytes=0-99", 0, 99, true},
{"bytes=100-199", 100, 199, true},
{"bytes=5-5", 5, 5, true},
{"", 0, 0, false},
{"bytes=0-", 0, 0, false}, // open-ended rejected
{"bytes=-100", 0, 0, false}, // suffix rejected
{"bytes=0-99,200-299", 0, 0, false}, // multi-range rejected
{"bytes=99-0", 0, 0, false}, // end < start rejected
{"items=0-99", 0, 0, false}, // wrong unit
{"bytes=a-b", 0, 0, false}, // non-numeric
}
for _, tt := range tests {
start, end, ok := parseSingleByteRange(tt.header)
assert.Equal(t, tt.ok, ok, "ok for %q", tt.header)
if tt.ok {
assert.Equal(t, tt.start, start, "start for %q", tt.header)
assert.Equal(t, tt.end, end, "end for %q", tt.header)
}
}
}

// failPinCache fails the test if the pinned-range handler reaches the cache,
// proving the oversized-range guard rejects the request before any read.
type failPinCache struct{ t *testing.T }

func (p failPinCache) Pin(context.Context, cache.Key) (cache.PinnedObject, error) {
p.t.Fatal("Pin must not be called")
return cache.PinnedObject{}, nil //nolint:nilnil // unreachable; t.Fatal above
}

func (p failPinCache) OpenPinnedRange(context.Context, cache.Key, string, int64, int64) (io.ReadCloser, error) {
p.t.Fatal("OpenPinnedRange must not be called for an oversized range")
return nil, nil //nolint:nilnil // unreachable; t.Fatal above
}

func TestServePinRangeRejectsOversizedRange(t *testing.T) {
s := &Strategy{}
req := httptest.NewRequest(http.MethodGet, "/git/x/snapshot.tar.zst", nil)
// Length = maxPinRangeBytes + 1, one byte over the cap.
req.Header.Set("Range", fmt.Sprintf("bytes=0-%d", int64(maxPinRangeBytes)))
w := httptest.NewRecorder()

s.servePinRange(w, req, failPinCache{t}, cache.NewKey("k"), "repo", "etag:x", time.Now())

assert.Equal(t, http.StatusRequestedRangeNotSatisfiable, w.Code)
}
Loading