-
Notifications
You must be signed in to change notification settings - Fork 5
feat(git): content-addressed immutable snapshots #325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,22 @@ func snapshotCacheKey(upstreamURL string) cache.Key { | |
| return cache.NewKey(upstreamURL + ".snapshot") | ||
| } | ||
|
|
||
| // snapshotCommitCacheKey is the key for an immutable, content-addressed snapshot | ||
| // blob. Each generation writes a new key (keyed by the snapshot's commit), so a | ||
| // blob is never overwritten: any replica resolves the same key to byte-identical | ||
| // bytes, parallel ranges stitch safely, and an in-flight download survives a | ||
| // regeneration (which writes a new key) until the old blob ages out via TTL. | ||
| func snapshotCommitCacheKey(upstreamURL, commit string) cache.Key { | ||
| return cache.NewKey(upstreamURL + ".snapshot." + commit) | ||
| } | ||
|
|
||
| // snapshotPointerCacheKey is the key for the small mutable pointer that records | ||
| // the current snapshot commit for a repo. It is resolved once per download and | ||
| // never range-served, so its mutation is harmless. | ||
| func snapshotPointerCacheKey(upstreamURL string) cache.Key { | ||
| return cache.NewKey(upstreamURL + ".snapshot.current") | ||
| } | ||
|
|
||
| func mirrorSnapshotCacheKey(upstreamURL string) cache.Key { | ||
| return cache.NewKey(upstreamURL + ".mirror-snapshot") | ||
| } | ||
|
|
@@ -50,6 +66,47 @@ func lfsSnapshotCacheKey(upstreamURL string) cache.Key { | |
| return cache.NewKey(upstreamURL + ".lfs-snapshot") | ||
| } | ||
|
|
||
| // maxPointerBytes bounds a pointer read; a commit SHA is well under this. | ||
| const maxPointerBytes = 256 | ||
|
|
||
| // writeSnapshotPointer records commit as the current snapshot for upstreamURL. | ||
| // Called after the blob is durably written so the pointer never references a | ||
| // missing blob. | ||
| func (s *Strategy) writeSnapshotPointer(ctx context.Context, upstreamURL, commit string) error { | ||
| headers := http.Header{"Content-Type": {"text/plain"}} | ||
| err := cache.WriteFunc(ctx, s.cache, snapshotPointerCacheKey(upstreamURL), headers, 0, func(w io.Writer) error { | ||
| _, err := io.WriteString(w, commit) | ||
| return errors.WithStack(err) | ||
| }) | ||
| return errors.Wrap(err, "write snapshot pointer") | ||
| } | ||
|
|
||
| // readSnapshotPointer returns the current snapshot commit for upstreamURL, or | ||
| // ("", false) if no snapshot has been generated yet. | ||
| func (s *Strategy) readSnapshotPointer(ctx context.Context, upstreamURL string) (string, bool) { | ||
| reader, _, err := s.cache.Open(ctx, snapshotPointerCacheKey(upstreamURL)) | ||
| if err != nil || reader == nil { | ||
| return "", false | ||
| } | ||
| defer reader.Close() | ||
| b, err := io.ReadAll(io.LimitReader(reader, maxPointerBytes)) | ||
| if err != nil { | ||
| return "", false | ||
| } | ||
| commit := strings.TrimSpace(string(b)) | ||
| return commit, commit != "" | ||
| } | ||
|
|
||
| // immutableSnapshotPath builds the content-addressed snapshot URL for a repo, | ||
| // advertised to clients so they can fetch byte ranges in parallel. | ||
| func immutableSnapshotPath(upstreamURL, commit string) string { | ||
| repoPath, err := gitclone.RepoPathFromURL(upstreamURL) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| return fmt.Sprintf("/git/%s/snapshot/%s.tar.zst", repoPath, commit) | ||
| } | ||
|
|
||
| // cloneForSnapshot clones the mirror into destDir under repo's read lock, | ||
| // then fixes the remote URL to point through cachew (or upstream). | ||
| func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Repository, destDir string) error { | ||
|
|
@@ -133,22 +190,41 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone | |
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| cacheKey := snapshotCacheKey(upstream) | ||
| var headSHA string | ||
| if err := s.withSnapshotClone(ctx, repo, "base", func(workDir string) error { | ||
| // Capture the snapshot's HEAD so we can later build a delta bundle between | ||
| // the cached snapshot and the current mirror state. | ||
| headSHA, err := revParse(ctx, workDir, "HEAD") | ||
| // Capture the snapshot's HEAD: it both content-addresses the immutable | ||
| // blob and lets clients build a delta bundle between the snapshot and the | ||
| // current mirror state. | ||
| sha, err := revParse(ctx, workDir, "HEAD") | ||
| if err != nil { | ||
| return errors.Wrap(err, "rev-parse HEAD for snapshot") | ||
| } | ||
| headSHA = sha | ||
|
|
||
| // Content-addressed blobs are write-once: a re-run for an unchanged HEAD | ||
| // would otherwise overwrite the blob with a byte-different archive (file | ||
| // mtimes, .git metadata), which could corrupt an in-flight parallel range | ||
| // download — the race this route exists to prevent. Skip if it exists. | ||
| commitKey := snapshotCommitCacheKey(upstream, headSHA) | ||
| if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil { | ||
| return nil | ||
| } else if !errors.Is(statErr, os.ErrNotExist) { | ||
| return errors.Wrap(statErr, "stat snapshot blob") | ||
| } | ||
|
|
||
| extraHeaders := http.Header{} | ||
| extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) | ||
|
|
||
| return snapshot.Create(ctx, s.cache, cacheKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders) | ||
| return snapshot.Create(ctx, s.cache, commitKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In a multi-pod deployment, two replicas generating the same new HEAD can both observe Useful? React with 👍 / 👎. |
||
| }); err != nil { | ||
| return errors.Wrap(err, "create snapshot") | ||
| } | ||
|
|
||
| // Flip the pointer only after the blob is durably written. | ||
| if err := s.writeSnapshotPointer(ctx, upstream, headSHA); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| s.metrics.recordOperation(ctx, "snapshot", "success", time.Since(start)) | ||
| logger.InfoContext(ctx, "Snapshot generation completed", "upstream", upstream) | ||
| return nil | ||
|
|
@@ -246,7 +322,31 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, | |
| return | ||
| } | ||
|
|
||
| // Resolve the content-addressed snapshot for this repo. Once a snapshot has | ||
| // been generated, every serve path below reads the immutable blob, and we | ||
| // advertise its stable URL via Content-Location so parallel-download clients | ||
| // can fetch byte ranges against an unchanging object. The legacy key is only | ||
| // used before the first snapshot exists (cold start), where the spool path | ||
| // generates one and writes the pointer. | ||
| cacheKey := snapshotCacheKey(upstreamURL) | ||
| var immutableLoc string | ||
| if commit, ok := s.readSnapshotPointer(ctx, upstreamURL); ok { | ||
| commitKey := snapshotCommitCacheKey(upstreamURL, commit) | ||
| // The pointer can outlive a blob that aged out, so only treat the | ||
| // immutable key as servable when it's present. | ||
| if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil { | ||
| cacheKey = commitKey | ||
| immutableLoc = immutableSnapshotPath(upstreamURL, commit) | ||
| } | ||
| } | ||
| // Advertise the immutable URL only from a path that actually opened the | ||
| // immutable blob, so an eviction between the Stat above and the open never | ||
| // leaves clients pointed at a URL that 404s. | ||
| setImmutableLocation := func() { | ||
| if immutableLoc != "" { | ||
| w.Header().Set("Content-Location", immutableLoc) | ||
| } | ||
| } | ||
|
|
||
| // On cold start the local mirror may not be ready yet. Check the S3 cache | ||
| // first so we can stream a cached snapshot to the client immediately while | ||
|
|
@@ -265,6 +365,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, | |
| winner.serving.Done() | ||
| }() | ||
| logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL) | ||
| setImmutableLocation() | ||
| w.Header().Set("Content-Type", "application/zstd") | ||
| n, err := serveReaderFast(w, r, reader) | ||
| s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) | ||
|
|
@@ -284,6 +385,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, | |
| reader, _, openErr := s.cache.Open(ctx, cacheKey) | ||
| if openErr == nil && reader != nil { | ||
| logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL) | ||
| setImmutableLocation() | ||
| w.Header().Set("Content-Type", "application/zstd") | ||
| n, err := serveReaderFast(w, r, reader) | ||
| s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) | ||
|
|
@@ -329,13 +431,97 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, | |
| } | ||
| defer reader.Close() | ||
|
|
||
| setImmutableLocation() | ||
| if err := s.serveSnapshotWithBundle(ctx, w, r, reader, headers, repo, upstreamURL, repoName, start); err != nil { | ||
| logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err) | ||
| span.RecordError(err) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| } | ||
| } | ||
|
|
||
| // handleImmutableSnapshotRequest serves a content-addressed snapshot blob at | ||
| // /git/<repo>/snapshot/<commit>.tar.zst. Because the blob is immutable, any | ||
| // replica returns identical bytes, so http.ServeContent's standard Range | ||
| // support lets a client download chunks in parallel and stitch them safely — | ||
| // no pin protocol, and an in-flight download survives a regeneration (which | ||
| // writes a different commit key) until the old blob ages out via TTL. | ||
| func (s *Strategy) handleImmutableSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { | ||
| start := time.Now() | ||
| repoPath, commit, ok := parseImmutableSnapshotPath(pathValue) | ||
| if !ok { | ||
| http.Error(w, "malformed immutable snapshot path", http.StatusBadRequest) | ||
| return | ||
| } | ||
| upstreamURL := "https://" + host + "/" + repoPath | ||
| repoName := host + "/" + repoPath | ||
|
|
||
| ctx, span := tracer.Start(r.Context(), "git.snapshot.serve_immutable", | ||
| trace.WithAttributes( | ||
| attribute.String("cachew.operation", "snapshot_serve_immutable"), | ||
| attribute.String("cachew.upstream", upstreamURL), | ||
| attribute.String("cachew.repository", repoName), | ||
| attribute.String("cachew.commit", commit), | ||
| ), | ||
| ) | ||
| defer span.End() | ||
| r = r.WithContext(ctx) | ||
| logger := logging.FromContext(ctx) | ||
|
|
||
| reader, headers, err := s.cache.Open(ctx, snapshotCommitCacheKey(upstreamURL, commit)) | ||
| if err != nil || reader == nil { | ||
| // Unknown commit or aged out of cache; the client should re-resolve via | ||
| // the stable snapshot URL, which regenerates and updates the pointer. | ||
| http.Error(w, "snapshot revision not available", http.StatusNotFound) | ||
| return | ||
| } | ||
| defer reader.Close() | ||
|
|
||
| for key, values := range headers { | ||
| for _, value := range values { | ||
| w.Header().Add(key, value) | ||
| } | ||
| } | ||
| // The blob is immutable, so it is safe to cache aggressively downstream. | ||
| w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") | ||
| // Advertise range support only on the disk tier, where http.ServeContent can | ||
| // satisfy ranges via sendfile. The cold S3 path streams a full body and can't | ||
| // honor ranges, so claiming it would force clients into a fail-closed restart. | ||
| if _, onDisk := reader.(*os.File); onDisk { | ||
| w.Header().Set("Accept-Ranges", "bytes") | ||
| } | ||
| if w.Header().Get("Content-Type") == "" { | ||
| w.Header().Set("Content-Type", "application/zstd") | ||
| } | ||
|
|
||
| n, err := serveReaderFast(w, r, reader) | ||
| s.metrics.recordSnapshotServe(ctx, "immutable", repoName, n, time.Since(start)) | ||
| span.SetAttributes(attribute.String("cachew.source", "immutable"), attribute.Int64("cachew.bytes", n)) | ||
| if err != nil { | ||
| logger.WarnContext(ctx, "Failed to stream immutable snapshot", "upstream", upstreamURL, "error", err) | ||
| span.RecordError(err) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| } | ||
| } | ||
|
|
||
| // parseImmutableSnapshotPath splits "<repo>/snapshot/<commit>.tar.zst" into the | ||
| // repo path and commit. The commit must be non-empty and slash-free. | ||
| func parseImmutableSnapshotPath(pathValue string) (repoPath, commit string, ok bool) { | ||
| const infix = "/snapshot/" | ||
| idx := strings.LastIndex(pathValue, infix) | ||
| if idx < 0 { | ||
| return "", "", false | ||
| } | ||
| commit = strings.TrimSuffix(pathValue[idx+len(infix):], ".tar.zst") | ||
| if commit == "" || strings.Contains(commit, "/") { | ||
| return "", "", false | ||
| } | ||
| repoPath = ExtractRepoPath(pathValue[:idx]) | ||
| if repoPath == "" { | ||
| return "", "", false | ||
| } | ||
| return repoPath, commit, true | ||
| } | ||
|
|
||
| func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error { | ||
| for key, values := range headers { | ||
| for _, value := range values { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.