Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/strategy/git/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,18 @@ func (s *Strategy) GenerateAndUploadMirrorSnapshot(ctx context.Context, repo *gi
func (s *Strategy) CacheBundle(ctx context.Context, key cache.Key, r io.Reader) error {
return s.cacheBundle(ctx, key, r)
}

// ReadSnapshotPointer exports readSnapshotPointer for testing.
func (s *Strategy) ReadSnapshotPointer(ctx context.Context, upstreamURL string) (string, bool) {
return s.readSnapshotPointer(ctx, upstreamURL)
}

// SnapshotCommitCacheKey exports snapshotCommitCacheKey for testing.
func SnapshotCommitCacheKey(upstreamURL, commit string) cache.Key {
return snapshotCommitCacheKey(upstreamURL, commit)
}

// ParseImmutableSnapshotPath exports parseImmutableSnapshotPath for testing.
func ParseImmutableSnapshotPath(pathValue string) (string, string, bool) {
return parseImmutableSnapshotPath(pathValue)
}
9 changes: 9 additions & 0 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
return
}

// Least specific of the snapshot routes: a content-addressed blob at
// <repo>/snapshot/<commit>.tar.zst. Checked after the exact-suffix routes
// above so repos/groups named "snapshot" still resolve correctly.
if strings.Contains(pathValue, "/snapshot/") && strings.HasSuffix(pathValue, ".tar.zst") {
s.metrics.recordRequest(ctx, "snapshot-immutable")
s.handleImmutableSnapshotRequest(w, r, host, pathValue)
return
}

service := r.URL.Query().Get("service")
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")

Expand Down
196 changes: 191 additions & 5 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ func snapshotCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".snapshot")
}

// snapshotCommitCacheKey is the key for an immutable, content-addressed snapshot
// blob. Each generation writes a new key (keyed by the snapshot's commit), so a
// blob is never overwritten: any replica resolves the same key to byte-identical
// bytes, parallel ranges stitch safely, and an in-flight download survives a
// regeneration (which writes a new key) until the old blob ages out via TTL.
func snapshotCommitCacheKey(upstreamURL, commit string) cache.Key {
return cache.NewKey(upstreamURL + ".snapshot." + commit)
}

// snapshotPointerCacheKey is the key for the small mutable pointer that records
// the current snapshot commit for a repo. It is resolved once per download and
// never range-served, so its mutation is harmless.
func snapshotPointerCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".snapshot.current")
}

func mirrorSnapshotCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".mirror-snapshot")
}
Expand All @@ -50,6 +66,47 @@ func lfsSnapshotCacheKey(upstreamURL string) cache.Key {
return cache.NewKey(upstreamURL + ".lfs-snapshot")
}

// maxPointerBytes bounds a pointer read; a commit SHA is well under this.
const maxPointerBytes = 256

// writeSnapshotPointer records commit as the current snapshot for upstreamURL.
// Called after the blob is durably written so the pointer never references a
// missing blob.
func (s *Strategy) writeSnapshotPointer(ctx context.Context, upstreamURL, commit string) error {
headers := http.Header{"Content-Type": {"text/plain"}}
err := cache.WriteFunc(ctx, s.cache, snapshotPointerCacheKey(upstreamURL), headers, 0, func(w io.Writer) error {
_, err := io.WriteString(w, commit)
return errors.WithStack(err)
})
return errors.Wrap(err, "write snapshot pointer")
}

// readSnapshotPointer returns the current snapshot commit for upstreamURL, or
// ("", false) if no snapshot has been generated yet.
func (s *Strategy) readSnapshotPointer(ctx context.Context, upstreamURL string) (string, bool) {
reader, _, err := s.cache.Open(ctx, snapshotPointerCacheKey(upstreamURL))
if err != nil || reader == nil {
return "", false
}
defer reader.Close()
b, err := io.ReadAll(io.LimitReader(reader, maxPointerBytes))
if err != nil {
return "", false
}
commit := strings.TrimSpace(string(b))
return commit, commit != ""
}

// immutableSnapshotPath builds the content-addressed snapshot URL for a repo,
// advertised to clients so they can fetch byte ranges in parallel.
func immutableSnapshotPath(upstreamURL, commit string) string {
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
if err != nil {
return ""
}
return fmt.Sprintf("/git/%s/snapshot/%s.tar.zst", repoPath, commit)
Comment thread
worstell marked this conversation as resolved.
}

// cloneForSnapshot clones the mirror into destDir under repo's read lock,
// then fixes the remote URL to point through cachew (or upstream).
func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Repository, destDir string) error {
Expand Down Expand Up @@ -133,22 +190,41 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone
mu.Lock()
defer mu.Unlock()

cacheKey := snapshotCacheKey(upstream)
var headSHA string
if err := s.withSnapshotClone(ctx, repo, "base", func(workDir string) error {
// Capture the snapshot's HEAD so we can later build a delta bundle between
// the cached snapshot and the current mirror state.
headSHA, err := revParse(ctx, workDir, "HEAD")
// Capture the snapshot's HEAD: it both content-addresses the immutable
// blob and lets clients build a delta bundle between the snapshot and the
// current mirror state.
sha, err := revParse(ctx, workDir, "HEAD")
if err != nil {
return errors.Wrap(err, "rev-parse HEAD for snapshot")
}
headSHA = sha

// Content-addressed blobs are write-once: a re-run for an unchanged HEAD
// would otherwise overwrite the blob with a byte-different archive (file
// mtimes, .git metadata), which could corrupt an in-flight parallel range
// download — the race this route exists to prevent. Skip if it exists.
commitKey := snapshotCommitCacheKey(upstream, headSHA)
if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil {
return nil
} else if !errors.Is(statErr, os.ErrNotExist) {
return errors.Wrap(statErr, "stat snapshot blob")
}

extraHeaders := http.Header{}
extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA)

return snapshot.Create(ctx, s.cache, cacheKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders)
return snapshot.Create(ctx, s.cache, commitKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid racing immutable snapshot writes across replicas

In a multi-pod deployment, two replicas generating the same new HEAD can both observe Stat as missing and then both call snapshot.Create for the same supposedly immutable key. The cache Create implementations replace the object on close, and these archives are not byte-stable across independent clones because of .git metadata/mtimes, so once the first replica has published the pointer and clients start parallel range requests, the second replica finishing later can swap the bytes under the same URL and reintroduce the mixed-range corruption this content-addressed route is meant to prevent. This needs cache-wide create-if-absent/CAS semantics or deterministic archive bytes before advertising the key as immutable.

Useful? React with 👍 / 👎.

}); err != nil {
return errors.Wrap(err, "create snapshot")
}

// Flip the pointer only after the blob is durably written.
if err := s.writeSnapshotPointer(ctx, upstream, headSHA); err != nil {
return err
}

s.metrics.recordOperation(ctx, "snapshot", "success", time.Since(start))
logger.InfoContext(ctx, "Snapshot generation completed", "upstream", upstream)
return nil
Expand Down Expand Up @@ -246,7 +322,31 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
return
}

// Resolve the content-addressed snapshot for this repo. Once a snapshot has
// been generated, every serve path below reads the immutable blob, and we
// advertise its stable URL via Content-Location so parallel-download clients
// can fetch byte ranges against an unchanging object. The legacy key is only
// used before the first snapshot exists (cold start), where the spool path
// generates one and writes the pointer.
cacheKey := snapshotCacheKey(upstreamURL)
var immutableLoc string
if commit, ok := s.readSnapshotPointer(ctx, upstreamURL); ok {
commitKey := snapshotCommitCacheKey(upstreamURL, commit)
// The pointer can outlive a blob that aged out, so only treat the
// immutable key as servable when it's present.
if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil {
cacheKey = commitKey
immutableLoc = immutableSnapshotPath(upstreamURL, commit)
}
}
// Advertise the immutable URL only from a path that actually opened the
// immutable blob, so an eviction between the Stat above and the open never
// leaves clients pointed at a URL that 404s.
setImmutableLocation := func() {
if immutableLoc != "" {
w.Header().Set("Content-Location", immutableLoc)
}
}

// On cold start the local mirror may not be ready yet. Check the S3 cache
// first so we can stream a cached snapshot to the client immediately while
Expand All @@ -265,6 +365,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
winner.serving.Done()
}()
logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL)
setImmutableLocation()
w.Header().Set("Content-Type", "application/zstd")
n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start))
Expand All @@ -284,6 +385,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
reader, _, openErr := s.cache.Open(ctx, cacheKey)
if openErr == nil && reader != nil {
logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL)
setImmutableLocation()
w.Header().Set("Content-Type", "application/zstd")
n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start))
Expand Down Expand Up @@ -329,13 +431,97 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
}
defer reader.Close()

setImmutableLocation()
if err := s.serveSnapshotWithBundle(ctx, w, r, reader, headers, repo, upstreamURL, repoName, start); err != nil {
logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}

// handleImmutableSnapshotRequest serves a content-addressed snapshot blob at
// /git/<repo>/snapshot/<commit>.tar.zst. Because the blob is immutable, any
// replica returns identical bytes, so http.ServeContent's standard Range
// support lets a client download chunks in parallel and stitch them safely —
// no pin protocol, and an in-flight download survives a regeneration (which
// writes a different commit key) until the old blob ages out via TTL.
func (s *Strategy) handleImmutableSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
start := time.Now()
repoPath, commit, ok := parseImmutableSnapshotPath(pathValue)
if !ok {
http.Error(w, "malformed immutable snapshot path", http.StatusBadRequest)
return
}
upstreamURL := "https://" + host + "/" + repoPath
repoName := host + "/" + repoPath

ctx, span := tracer.Start(r.Context(), "git.snapshot.serve_immutable",
trace.WithAttributes(
attribute.String("cachew.operation", "snapshot_serve_immutable"),
attribute.String("cachew.upstream", upstreamURL),
attribute.String("cachew.repository", repoName),
attribute.String("cachew.commit", commit),
),
)
defer span.End()
r = r.WithContext(ctx)
logger := logging.FromContext(ctx)

reader, headers, err := s.cache.Open(ctx, snapshotCommitCacheKey(upstreamURL, commit))
if err != nil || reader == nil {
// Unknown commit or aged out of cache; the client should re-resolve via
// the stable snapshot URL, which regenerates and updates the pointer.
http.Error(w, "snapshot revision not available", http.StatusNotFound)
return
}
defer reader.Close()

for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
}
}
// The blob is immutable, so it is safe to cache aggressively downstream.
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
// Advertise range support only on the disk tier, where http.ServeContent can
// satisfy ranges via sendfile. The cold S3 path streams a full body and can't
// honor ranges, so claiming it would force clients into a fail-closed restart.
if _, onDisk := reader.(*os.File); onDisk {
w.Header().Set("Accept-Ranges", "bytes")
}
if w.Header().Get("Content-Type") == "" {
w.Header().Set("Content-Type", "application/zstd")
}

n, err := serveReaderFast(w, r, reader)
s.metrics.recordSnapshotServe(ctx, "immutable", repoName, n, time.Since(start))
span.SetAttributes(attribute.String("cachew.source", "immutable"), attribute.Int64("cachew.bytes", n))
if err != nil {
logger.WarnContext(ctx, "Failed to stream immutable snapshot", "upstream", upstreamURL, "error", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}

// parseImmutableSnapshotPath splits "<repo>/snapshot/<commit>.tar.zst" into the
// repo path and commit. The commit must be non-empty and slash-free.
func parseImmutableSnapshotPath(pathValue string) (repoPath, commit string, ok bool) {
const infix = "/snapshot/"
idx := strings.LastIndex(pathValue, infix)
if idx < 0 {
return "", "", false
}
commit = strings.TrimSuffix(pathValue[idx+len(infix):], ".tar.zst")
if commit == "" || strings.Contains(commit, "/") {
return "", "", false
}
repoPath = ExtractRepoPath(pathValue[:idx])
if repoPath == "" {
return "", "", false
}
return repoPath, commit, true
}

func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error {
for key, values := range headers {
for _, value := range values {
Expand Down
Loading