Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions internal/featureflags/featureflags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Package featureflags provides typed feature flags backed by environment
// variables. Each flag is read from an environment variable named
// CACHEW_FF_<NAME> (uppercased) and JSON-decoded into the flag's type. If the
// variable is unset, the flag returns its default value. Supported types are
// int and bool.
package featureflags

import (
"encoding/json"
"fmt"
"os"
"strings"
)

type Type interface{ int | bool }

type Flag[T Type] struct {
name string
dflt T
}

func New[T Type](name string, dflt T) Flag[T] {
f := Flag[T]{name: name, dflt: dflt}
f.Get() // no-op Get to validate that the envar is valid.
return f
}

func (f Flag[T]) Get() (out T) {
v, ok := os.LookupEnv("CACHEW_FF_" + strings.ToUpper(f.name))
if !ok {
return f.dflt
}
err := json.Unmarshal([]byte(v), &out)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal feature flag %s: %v", f.name, err))
}
return out
}
38 changes: 38 additions & 0 deletions internal/featureflags/featureflags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package featureflags_test

import (
"testing"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/featureflags"
)

func TestBoolFlagDefault(t *testing.T) {
f := featureflags.New("testbooldefault", true)
assert.Equal(t, true, f.Get())
}

func TestBoolFlagFromEnv(t *testing.T) {
t.Setenv("CACHEW_FF_TESTBOOLENV", "false")
f := featureflags.New("testboolenv", true)
assert.Equal(t, false, f.Get())
}

func TestIntFlagDefault(t *testing.T) {
f := featureflags.New("testintdefault", 42)
assert.Equal(t, 42, f.Get())
}

func TestIntFlagFromEnv(t *testing.T) {
t.Setenv("CACHEW_FF_TESTINTENV", "99")
f := featureflags.New("testintenv", 0)
assert.Equal(t, 99, f.Get())
}

func TestInvalidEnvPanics(t *testing.T) {
t.Setenv("CACHEW_FF_TESTINVALID", "notanint")
assert.Panics(t, func() {
featureflags.New("testinvalid", 0)
})
}
7 changes: 7 additions & 0 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type RootScheduler struct {
activeClones int
maxCloneConcurrency int
cancel context.CancelFunc
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
}
Expand Down Expand Up @@ -124,6 +125,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
}
ctx, cancel := context.WithCancel(ctx)
q.cancel = cancel
q.wg.Add(config.Concurrency)
for id := range config.Concurrency {
go q.worker(ctx, id)
}
Expand Down Expand Up @@ -194,7 +196,12 @@ func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.D
return 0
}

// Wait blocks until all worker goroutines have exited. The caller should
// cancel the context passed to New first, otherwise Wait blocks forever.
func (q *RootScheduler) Wait() { q.wg.Wait() }

func (q *RootScheduler) worker(ctx context.Context, id int) {
defer q.wg.Done()
logger := logging.FromContext(ctx).With("scheduler-worker", id)
for {
select {
Expand Down
28 changes: 20 additions & 8 deletions internal/strategy/git/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/gitclone"
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/snapshot"
"github.com/block/cachew/internal/strategy/git"
Expand Down Expand Up @@ -578,15 +579,23 @@ func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) {
}

_, ctx := logging.Configure(context.Background(), logging.Config{})
ctx, cancelAll := context.WithCancel(ctx)
tmpDir := t.TempDir()
mirrorRoot := filepath.Join(tmpDir, "mirrors")

sched, err := jobscheduler.New(ctx, jobscheduler.Config{})
assert.NoError(t, err)
// Cancel the context and wait for all scheduler workers to drain before
// TempDir cleanup runs, preventing a race with background jobs.
t.Cleanup(func() { cancelAll(); sched.Wait() })

memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
mux := newTestMux()

schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil }
cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

// Pre-populate the cache with a fake snapshot that has NO X-Cachew-Snapshot-Commit
Expand All @@ -607,16 +616,16 @@ func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) {

// Use a cancelled context so ensureCloneReady fails quickly and the cold
// path returns the cached snapshot.
cancelCtx, cancel := context.WithCancel(ctx)
reqCtx, cancelReq := context.WithCancel(ctx)

req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/coldrepo/snapshot.tar.zst", nil)
req = req.WithContext(cancelCtx)
req = req.WithContext(reqCtx)
req.SetPathValue("host", "github.com")
req.SetPathValue("path", "org/coldrepo/snapshot.tar.zst")
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)
cancel()
cancelReq()

// Cold path should serve the snapshot but without X-Cachew-Snapshot-Commit,
// signaling to the client that it needs to freshen.
Expand All @@ -633,15 +642,21 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) {
}

_, ctx := logging.Configure(context.Background(), logging.Config{})
ctx, cancelAll := context.WithCancel(ctx)
tmpDir := t.TempDir()
mirrorRoot := filepath.Join(tmpDir, "mirrors")

sched, err := jobscheduler.New(ctx, jobscheduler.Config{})
assert.NoError(t, err)
t.Cleanup(func() { cancelAll(); sched.Wait() })

memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
mux := newTestMux()

schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil }
cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

// Pre-populate cache with a fake snapshot.
Expand Down Expand Up @@ -677,9 +692,6 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) {
handler.ServeHTTP(w2, req2)
// Second request may be 200 (from local cache) or 503 (clone not ready).
// The key assertion is that it doesn't panic from double-scheduling.

// Allow background goroutines to settle.
time.Sleep(time.Second)
}

func TestSnapshotRemoteURLUsesServerURL(t *testing.T) {
Expand Down