From 216a22a1c56f77397f8fffdc420040b7f8539237 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Fri, 12 Jun 2026 11:23:28 -0700 Subject: [PATCH] feat(git): content-addressed immutable snapshots Snapshots were written to a single mutable key (upstream+".snapshot"), which made range requests unsafe: a regeneration could swap the bytes mid-download, so parallel/ranged fetches needed bespoke pinning. Write each snapshot once under a commit-addressed key and keep a tiny mutable pointer to the current commit. The stable snapshot URL resolves the pointer and advertises the immutable URL via Content-Location; a new /git//snapshot/.tar.zst route serves the blob with standard HTTP Range support (zero-copy sendfile on the disk tier). In-flight downloads survive regenerations until the old blob ages out via TTL. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019eae3d-a2fd-70ca-80fe-a7536ec6748c --- internal/strategy/git/export_test.go | 15 ++ internal/strategy/git/git.go | 9 + internal/strategy/git/snapshot.go | 196 +++++++++++++++++- internal/strategy/git/snapshot_test.go | 269 ++++++++++++++++++++++++- 4 files changed, 480 insertions(+), 9 deletions(-) diff --git a/internal/strategy/git/export_test.go b/internal/strategy/git/export_test.go index 9184c1b..8ef636f 100644 --- a/internal/strategy/git/export_test.go +++ b/internal/strategy/git/export_test.go @@ -24,3 +24,18 @@ func (s *Strategy) GenerateAndUploadMirrorSnapshot(ctx context.Context, repo *gi func (s *Strategy) CacheBundle(ctx context.Context, key cache.Key, r io.Reader) error { return s.cacheBundle(ctx, key, r) } + +// ReadSnapshotPointer exports readSnapshotPointer for testing. +func (s *Strategy) ReadSnapshotPointer(ctx context.Context, upstreamURL string) (string, bool) { + return s.readSnapshotPointer(ctx, upstreamURL) +} + +// SnapshotCommitCacheKey exports snapshotCommitCacheKey for testing. +func SnapshotCommitCacheKey(upstreamURL, commit string) cache.Key { + return snapshotCommitCacheKey(upstreamURL, commit) +} + +// ParseImmutableSnapshotPath exports parseImmutableSnapshotPath for testing. +func ParseImmutableSnapshotPath(pathValue string) (string, string, bool) { + return parseImmutableSnapshotPath(pathValue) +} diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 261b3d3..0d93a95 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -295,6 +295,15 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { return } + // Least specific of the snapshot routes: a content-addressed blob at + // /snapshot/.tar.zst. Checked after the exact-suffix routes + // above so repos/groups named "snapshot" still resolve correctly. + if strings.Contains(pathValue, "/snapshot/") && strings.HasSuffix(pathValue, ".tar.zst") { + s.metrics.recordRequest(ctx, "snapshot-immutable") + s.handleImmutableSnapshotRequest(w, r, host, pathValue) + return + } + service := r.URL.Query().Get("service") isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack") diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 28b1389..45902ce 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -38,6 +38,22 @@ func snapshotCacheKey(upstreamURL string) cache.Key { return cache.NewKey(upstreamURL + ".snapshot") } +// snapshotCommitCacheKey is the key for an immutable, content-addressed snapshot +// blob. Each generation writes a new key (keyed by the snapshot's commit), so a +// blob is never overwritten: any replica resolves the same key to byte-identical +// bytes, parallel ranges stitch safely, and an in-flight download survives a +// regeneration (which writes a new key) until the old blob ages out via TTL. +func snapshotCommitCacheKey(upstreamURL, commit string) cache.Key { + return cache.NewKey(upstreamURL + ".snapshot." + commit) +} + +// snapshotPointerCacheKey is the key for the small mutable pointer that records +// the current snapshot commit for a repo. It is resolved once per download and +// never range-served, so its mutation is harmless. +func snapshotPointerCacheKey(upstreamURL string) cache.Key { + return cache.NewKey(upstreamURL + ".snapshot.current") +} + func mirrorSnapshotCacheKey(upstreamURL string) cache.Key { return cache.NewKey(upstreamURL + ".mirror-snapshot") } @@ -50,6 +66,47 @@ func lfsSnapshotCacheKey(upstreamURL string) cache.Key { return cache.NewKey(upstreamURL + ".lfs-snapshot") } +// maxPointerBytes bounds a pointer read; a commit SHA is well under this. +const maxPointerBytes = 256 + +// writeSnapshotPointer records commit as the current snapshot for upstreamURL. +// Called after the blob is durably written so the pointer never references a +// missing blob. +func (s *Strategy) writeSnapshotPointer(ctx context.Context, upstreamURL, commit string) error { + headers := http.Header{"Content-Type": {"text/plain"}} + err := cache.WriteFunc(ctx, s.cache, snapshotPointerCacheKey(upstreamURL), headers, 0, func(w io.Writer) error { + _, err := io.WriteString(w, commit) + return errors.WithStack(err) + }) + return errors.Wrap(err, "write snapshot pointer") +} + +// readSnapshotPointer returns the current snapshot commit for upstreamURL, or +// ("", false) if no snapshot has been generated yet. +func (s *Strategy) readSnapshotPointer(ctx context.Context, upstreamURL string) (string, bool) { + reader, _, err := s.cache.Open(ctx, snapshotPointerCacheKey(upstreamURL)) + if err != nil || reader == nil { + return "", false + } + defer reader.Close() + b, err := io.ReadAll(io.LimitReader(reader, maxPointerBytes)) + if err != nil { + return "", false + } + commit := strings.TrimSpace(string(b)) + return commit, commit != "" +} + +// immutableSnapshotPath builds the content-addressed snapshot URL for a repo, +// advertised to clients so they can fetch byte ranges in parallel. +func immutableSnapshotPath(upstreamURL, commit string) string { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) + if err != nil { + return "" + } + return fmt.Sprintf("/git/%s/snapshot/%s.tar.zst", repoPath, commit) +} + // cloneForSnapshot clones the mirror into destDir under repo's read lock, // then fixes the remote URL to point through cachew (or upstream). func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Repository, destDir string) error { @@ -133,22 +190,41 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone mu.Lock() defer mu.Unlock() - cacheKey := snapshotCacheKey(upstream) + var headSHA string if err := s.withSnapshotClone(ctx, repo, "base", func(workDir string) error { - // Capture the snapshot's HEAD so we can later build a delta bundle between - // the cached snapshot and the current mirror state. - headSHA, err := revParse(ctx, workDir, "HEAD") + // Capture the snapshot's HEAD: it both content-addresses the immutable + // blob and lets clients build a delta bundle between the snapshot and the + // current mirror state. + sha, err := revParse(ctx, workDir, "HEAD") if err != nil { return errors.Wrap(err, "rev-parse HEAD for snapshot") } + headSHA = sha + + // Content-addressed blobs are write-once: a re-run for an unchanged HEAD + // would otherwise overwrite the blob with a byte-different archive (file + // mtimes, .git metadata), which could corrupt an in-flight parallel range + // download — the race this route exists to prevent. Skip if it exists. + commitKey := snapshotCommitCacheKey(upstream, headSHA) + if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil { + return nil + } else if !errors.Is(statErr, os.ErrNotExist) { + return errors.Wrap(statErr, "stat snapshot blob") + } + extraHeaders := http.Header{} extraHeaders.Set("X-Cachew-Snapshot-Commit", headSHA) - return snapshot.Create(ctx, s.cache, cacheKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders) + return snapshot.Create(ctx, s.cache, commitKey, workDir, 0, nil, s.config.ZstdThreads, extraHeaders) }); err != nil { return errors.Wrap(err, "create snapshot") } + // Flip the pointer only after the blob is durably written. + if err := s.writeSnapshotPointer(ctx, upstream, headSHA); err != nil { + return err + } + s.metrics.recordOperation(ctx, "snapshot", "success", time.Since(start)) logger.InfoContext(ctx, "Snapshot generation completed", "upstream", upstream) return nil @@ -246,7 +322,31 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, return } + // Resolve the content-addressed snapshot for this repo. Once a snapshot has + // been generated, every serve path below reads the immutable blob, and we + // advertise its stable URL via Content-Location so parallel-download clients + // can fetch byte ranges against an unchanging object. The legacy key is only + // used before the first snapshot exists (cold start), where the spool path + // generates one and writes the pointer. cacheKey := snapshotCacheKey(upstreamURL) + var immutableLoc string + if commit, ok := s.readSnapshotPointer(ctx, upstreamURL); ok { + commitKey := snapshotCommitCacheKey(upstreamURL, commit) + // The pointer can outlive a blob that aged out, so only treat the + // immutable key as servable when it's present. + if _, statErr := s.cache.Stat(ctx, commitKey); statErr == nil { + cacheKey = commitKey + immutableLoc = immutableSnapshotPath(upstreamURL, commit) + } + } + // Advertise the immutable URL only from a path that actually opened the + // immutable blob, so an eviction between the Stat above and the open never + // leaves clients pointed at a URL that 404s. + setImmutableLocation := func() { + if immutableLoc != "" { + w.Header().Set("Content-Location", immutableLoc) + } + } // On cold start the local mirror may not be ready yet. Check the S3 cache // first so we can stream a cached snapshot to the client immediately while @@ -265,6 +365,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, winner.serving.Done() }() logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL) + setImmutableLocation() w.Header().Set("Content-Type", "application/zstd") n, err := serveReaderFast(w, r, reader) s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) @@ -284,6 +385,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, reader, _, openErr := s.cache.Open(ctx, cacheKey) if openErr == nil && reader != nil { logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL) + setImmutableLocation() w.Header().Set("Content-Type", "application/zstd") n, err := serveReaderFast(w, r, reader) s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) @@ -329,6 +431,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } defer reader.Close() + setImmutableLocation() if err := s.serveSnapshotWithBundle(ctx, w, r, reader, headers, repo, upstreamURL, repoName, start); err != nil { logger.ErrorContext(ctx, "Failed to serve snapshot", "upstream", upstreamURL, "error", err) span.RecordError(err) @@ -336,6 +439,89 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } } +// handleImmutableSnapshotRequest serves a content-addressed snapshot blob at +// /git//snapshot/.tar.zst. Because the blob is immutable, any +// replica returns identical bytes, so http.ServeContent's standard Range +// support lets a client download chunks in parallel and stitch them safely — +// no pin protocol, and an in-flight download survives a regeneration (which +// writes a different commit key) until the old blob ages out via TTL. +func (s *Strategy) handleImmutableSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { + start := time.Now() + repoPath, commit, ok := parseImmutableSnapshotPath(pathValue) + if !ok { + http.Error(w, "malformed immutable snapshot path", http.StatusBadRequest) + return + } + upstreamURL := "https://" + host + "/" + repoPath + repoName := host + "/" + repoPath + + ctx, span := tracer.Start(r.Context(), "git.snapshot.serve_immutable", + trace.WithAttributes( + attribute.String("cachew.operation", "snapshot_serve_immutable"), + attribute.String("cachew.upstream", upstreamURL), + attribute.String("cachew.repository", repoName), + attribute.String("cachew.commit", commit), + ), + ) + defer span.End() + r = r.WithContext(ctx) + logger := logging.FromContext(ctx) + + reader, headers, err := s.cache.Open(ctx, snapshotCommitCacheKey(upstreamURL, commit)) + if err != nil || reader == nil { + // Unknown commit or aged out of cache; the client should re-resolve via + // the stable snapshot URL, which regenerates and updates the pointer. + http.Error(w, "snapshot revision not available", http.StatusNotFound) + return + } + defer reader.Close() + + for key, values := range headers { + for _, value := range values { + w.Header().Add(key, value) + } + } + // The blob is immutable, so it is safe to cache aggressively downstream. + w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") + // Advertise range support only on the disk tier, where http.ServeContent can + // satisfy ranges via sendfile. The cold S3 path streams a full body and can't + // honor ranges, so claiming it would force clients into a fail-closed restart. + if _, onDisk := reader.(*os.File); onDisk { + w.Header().Set("Accept-Ranges", "bytes") + } + if w.Header().Get("Content-Type") == "" { + w.Header().Set("Content-Type", "application/zstd") + } + + n, err := serveReaderFast(w, r, reader) + s.metrics.recordSnapshotServe(ctx, "immutable", repoName, n, time.Since(start)) + span.SetAttributes(attribute.String("cachew.source", "immutable"), attribute.Int64("cachew.bytes", n)) + if err != nil { + logger.WarnContext(ctx, "Failed to stream immutable snapshot", "upstream", upstreamURL, "error", err) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } +} + +// parseImmutableSnapshotPath splits "/snapshot/.tar.zst" into the +// repo path and commit. The commit must be non-empty and slash-free. +func parseImmutableSnapshotPath(pathValue string) (repoPath, commit string, ok bool) { + const infix = "/snapshot/" + idx := strings.LastIndex(pathValue, infix) + if idx < 0 { + return "", "", false + } + commit = strings.TrimSuffix(pathValue[idx+len(infix):], ".tar.zst") + if commit == "" || strings.Contains(commit, "/") { + return "", "", false + } + repoPath = ExtractRepoPath(pathValue[:idx]) + if repoPath == "" { + return "", "", false + } + return repoPath, commit, true +} + func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header) error { for key, values := range headers { for _, value := range values { diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 573c59f..f192f0e 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -209,8 +209,10 @@ func TestSnapshotGenerationViaLocalClone(t *testing.T) { err = s.GenerateAndUploadSnapshot(ctx, repo) assert.NoError(t, err) - // Verify snapshot was uploaded to cache. - cacheKey := cache.NewKey(upstreamURL + ".snapshot") + // Verify snapshot was uploaded under the content-addressed key + pointer. + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + cacheKey := git.SnapshotCommitCacheKey(upstreamURL, commit) _, headers, err := memCache.Open(ctx, cacheKey) assert.NoError(t, err) assert.Equal(t, "application/zstd", headers.Get("Content-Type")) @@ -276,7 +278,9 @@ func TestSnapshotGenerationIncludesTrackedLockFiles(t *testing.T) { err = s.GenerateAndUploadSnapshot(ctx, repo) assert.NoError(t, err) - cacheKey := cache.NewKey(upstreamURL + ".snapshot") + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + cacheKey := git.SnapshotCommitCacheKey(upstreamURL, commit) restoreDir := filepath.Join(tmpDir, "restored") err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir, 0) assert.NoError(t, err) @@ -786,7 +790,9 @@ func TestSnapshotRemoteURLUsesUpstreamURL(t *testing.T) { err = s.GenerateAndUploadSnapshot(ctx, repo) assert.NoError(t, err) - cacheKey := cache.NewKey(upstreamURL + ".snapshot") + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + cacheKey := git.SnapshotCommitCacheKey(upstreamURL, commit) restoreDir := filepath.Join(tmpDir, "restored") err = snapshot.Restore(ctx, memCache, cacheKey, restoreDir, 0) assert.NoError(t, err) @@ -796,3 +802,258 @@ func TestSnapshotRemoteURLUsesUpstreamURL(t *testing.T) { assert.NoError(t, err, string(output)) assert.Equal(t, upstreamURL+"\n", string(output)) } + +func TestParseImmutableSnapshotPath(t *testing.T) { + for _, tc := range []struct { + name string + path string + repoPath string + commit string + ok bool + }{ + {"valid", "org/repo/snapshot/abc123.tar.zst", "org/repo", "abc123", true}, + {"nested repo", "org/team/repo/snapshot/deadbeef.tar.zst", "org/team/repo", "deadbeef", true}, + {"no infix", "org/repo/snapshot.tar.zst", "", "", false}, + {"empty commit", "org/repo/snapshot/.tar.zst", "", "", false}, + {"slash in commit", "org/repo/snapshot/a/b.tar.zst", "", "", false}, + } { + t.Run(tc.name, func(t *testing.T) { + repoPath, commit, ok := git.ParseImmutableSnapshotPath(tc.path) + assert.Equal(t, tc.ok, ok) + assert.Equal(t, tc.repoPath, repoPath) + assert.Equal(t, tc.commit, commit) + }) + } +} + +// TestStableSnapshotRoutingForRepoNamedSnapshot guards against the immutable +// route (matched by "/snapshot/" + ".tar.zst") swallowing the stable snapshot +// request of a repo whose own path ends in "snapshot". +func TestStableSnapshotRoutingForRepoNamedSnapshot(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/snapshot" + + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "snapshot") + createTestMirrorRepo(t, mirrorPath) + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + waitForReady(t, s) + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + handler := mux.handlers["GET /git/{host}/{path...}"] + path := "org/snapshot/snapshot.tar.zst" + req := httptest.NewRequest(http.MethodGet, "/git/github.com/"+path, nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", path) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + // The stable handler serves the snapshot; misrouting to the immutable + // handler would parse commit "snapshot" and 404. + assert.Equal(t, 200, w.Code) + assert.True(t, w.Body.Len() > 0) +} + +func TestImmutableSnapshotRouteServesBlob(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/repo" + + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + // A disk cache serves blobs as *os.File, exercising the http.ServeContent + // range path the immutable route depends on. + diskCache, err := cache.NewDisk(ctx, cache.DiskConfig{Root: filepath.Join(tmpDir, "cache"), MaxTTL: time.Hour, EvictInterval: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), diskCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + waitForReady(t, s) + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + + handler := mux.handlers["GET /git/{host}/{path...}"] + path := "org/repo/snapshot/" + commit + ".tar.zst" + req := httptest.NewRequest(http.MethodGet, "/git/github.com/"+path, nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", path) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.Equal(t, "bytes", w.Header().Get("Accept-Ranges")) + assert.Equal(t, "application/zstd", w.Header().Get("Content-Type")) + assert.True(t, w.Body.Len() > 0) + + // A range request against the immutable blob must return 206 so parallel + // clients can stitch byte ranges. + rangeReq := httptest.NewRequest(http.MethodGet, "/git/github.com/"+path, nil) + rangeReq = rangeReq.WithContext(ctx) + rangeReq.SetPathValue("host", "github.com") + rangeReq.SetPathValue("path", path) + rangeReq.Header.Set("Range", "bytes=0-99") + rangeW := httptest.NewRecorder() + handler.ServeHTTP(rangeW, rangeReq) + assert.Equal(t, http.StatusPartialContent, rangeW.Code) + assert.Equal(t, 100, rangeW.Body.Len()) + + // An unknown commit must 404 so the client re-resolves via the stable URL. + missPath := "org/repo/snapshot/0000000000000000000000000000000000000000.tar.zst" + missReq := httptest.NewRequest(http.MethodGet, "/git/github.com/"+missPath, nil) + missReq = missReq.WithContext(ctx) + missReq.SetPathValue("host", "github.com") + missReq.SetPathValue("path", missPath) + missW := httptest.NewRecorder() + + handler.ServeHTTP(missW, missReq) + assert.Equal(t, 404, missW.Code) +} + +// TestImmutableSnapshotColdTierOmitsAcceptRanges verifies the cold path (a +// non-file reader, e.g. an S3 stream) does not advertise range support, so +// clients don't attempt ranges the fallback copy can't honor. +func TestImmutableSnapshotColdTierOmitsAcceptRanges(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/repo" + + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + // Memory cache serves blobs as a non-file reader, modeling the cold S3 path. + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + waitForReady(t, s) + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + + handler := mux.handlers["GET /git/{host}/{path...}"] + path := "org/repo/snapshot/" + commit + ".tar.zst" + req := httptest.NewRequest(http.MethodGet, "/git/github.com/"+path, nil) + req = req.WithContext(ctx) + req.SetPathValue("host", "github.com") + req.SetPathValue("path", path) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + assert.Equal(t, 200, w.Code) + assert.Equal(t, "", w.Header().Get("Accept-Ranges")) +} + +// TestSnapshotWriteOnceForUnchangedHead verifies a re-run for an unchanged HEAD +// does not overwrite the existing content-addressed blob, which would otherwise +// corrupt an in-flight parallel range download. +func TestSnapshotWriteOnceForUnchangedHead(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{}) + tmpDir := t.TempDir() + mirrorRoot := filepath.Join(tmpDir, "mirrors") + upstreamURL := "https://github.com/org/repo" + + mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo") + createTestMirrorRepo(t, mirrorPath) + + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) + assert.NoError(t, err) + mux := newTestMux() + + cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) + s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + manager, err := cm() + assert.NoError(t, err) + repo, err := manager.GetOrCreate(ctx, upstreamURL) + assert.NoError(t, err) + + waitForReady(t, s) + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + commit, ok := s.ReadSnapshotPointer(ctx, upstreamURL) + assert.True(t, ok) + commitKey := git.SnapshotCommitCacheKey(upstreamURL, commit) + + // Replace the blob with a sentinel; a write-once regeneration must leave it + // untouched, whereas an overwrite would replace it with a real archive. + sentinel := []byte("SENTINEL-DO-NOT-OVERWRITE") + writer, err := memCache.Create(ctx, commitKey, http.Header{"Content-Type": {"application/zstd"}}, time.Hour) + assert.NoError(t, err) + _, err = writer.Write(sentinel) + assert.NoError(t, err) + assert.NoError(t, writer.Close()) + + // Regenerate with HEAD unchanged. + err = s.GenerateAndUploadSnapshot(ctx, repo) + assert.NoError(t, err) + + reader, _, err := memCache.Open(ctx, commitKey) + assert.NoError(t, err) + defer reader.Close() + got, err := io.ReadAll(reader) + assert.NoError(t, err) + assert.Equal(t, string(sentinel), string(got)) +}