diff --git a/internal/featureflags/featureflags.go b/internal/featureflags/featureflags.go new file mode 100644 index 0000000..f47ffc6 --- /dev/null +++ b/internal/featureflags/featureflags.go @@ -0,0 +1,38 @@ +// Package featureflags provides typed feature flags backed by environment +// variables. Each flag is read from an environment variable named +// CACHEW_FF_ (uppercased) and JSON-decoded into the flag's type. If the +// variable is unset, the flag returns its default value. Supported types are +// int and bool. +package featureflags + +import ( + "encoding/json" + "fmt" + "os" + "strings" +) + +type Type interface{ int | bool } + +type Flag[T Type] struct { + name string + dflt T +} + +func New[T Type](name string, dflt T) Flag[T] { + f := Flag[T]{name: name, dflt: dflt} + f.Get() // no-op Get to validate that the envar is valid. + return f +} + +func (f Flag[T]) Get() (out T) { + v, ok := os.LookupEnv("CACHEW_FF_" + strings.ToUpper(f.name)) + if !ok { + return f.dflt + } + err := json.Unmarshal([]byte(v), &out) + if err != nil { + panic(fmt.Sprintf("failed to unmarshal feature flag %s: %v", f.name, err)) + } + return out +} diff --git a/internal/featureflags/featureflags_test.go b/internal/featureflags/featureflags_test.go new file mode 100644 index 0000000..53a343a --- /dev/null +++ b/internal/featureflags/featureflags_test.go @@ -0,0 +1,38 @@ +package featureflags_test + +import ( + "testing" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/featureflags" +) + +func TestBoolFlagDefault(t *testing.T) { + f := featureflags.New("testbooldefault", true) + assert.Equal(t, true, f.Get()) +} + +func TestBoolFlagFromEnv(t *testing.T) { + t.Setenv("CACHEW_FF_TESTBOOLENV", "false") + f := featureflags.New("testboolenv", true) + assert.Equal(t, false, f.Get()) +} + +func TestIntFlagDefault(t *testing.T) { + f := featureflags.New("testintdefault", 42) + assert.Equal(t, 42, f.Get()) +} + +func TestIntFlagFromEnv(t *testing.T) { + t.Setenv("CACHEW_FF_TESTINTENV", "99") + f := featureflags.New("testintenv", 0) + assert.Equal(t, 99, f.Get()) +} + +func TestInvalidEnvPanics(t *testing.T) { + t.Setenv("CACHEW_FF_TESTINVALID", "notanint") + assert.Panics(t, func() { + featureflags.New("testinvalid", 0) + }) +} diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index ab689cd..61fe365 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -81,6 +81,7 @@ type RootScheduler struct { activeClones int maxCloneConcurrency int cancel context.CancelFunc + wg sync.WaitGroup store ScheduleStore metrics *schedulerMetrics } @@ -124,6 +125,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { } ctx, cancel := context.WithCancel(ctx) q.cancel = cancel + q.wg.Add(config.Concurrency) for id := range config.Concurrency { go q.worker(ctx, id) } @@ -194,7 +196,12 @@ func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.D return 0 } +// Wait blocks until all worker goroutines have exited. The caller should +// cancel the context passed to New first, otherwise Wait blocks forever. +func (q *RootScheduler) Wait() { q.wg.Wait() } + func (q *RootScheduler) worker(ctx context.Context, id int) { + defer q.wg.Done() logger := logging.FromContext(ctx).With("scheduler-worker", id) for { select { diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 98f607f..0e20096 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -16,6 +16,7 @@ import ( "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/gitclone" "github.com/block/cachew/internal/githubapp" + "github.com/block/cachew/internal/jobscheduler" "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/snapshot" "github.com/block/cachew/internal/strategy/git" @@ -578,15 +579,23 @@ func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) { } _, ctx := logging.Configure(context.Background(), logging.Config{}) + ctx, cancelAll := context.WithCancel(ctx) tmpDir := t.TempDir() mirrorRoot := filepath.Join(tmpDir, "mirrors") + sched, err := jobscheduler.New(ctx, jobscheduler.Config{}) + assert.NoError(t, err) + // Cancel the context and wait for all scheduler workers to drain before + // TempDir cleanup runs, preventing a race with background jobs. + t.Cleanup(func() { cancelAll(); sched.Wait() }) + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) mux := newTestMux() + schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil } cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) - _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) // Pre-populate the cache with a fake snapshot that has NO X-Cachew-Snapshot-Commit @@ -607,16 +616,16 @@ func TestColdSnapshotServesWithoutCommitHeader(t *testing.T) { // Use a cancelled context so ensureCloneReady fails quickly and the cold // path returns the cached snapshot. - cancelCtx, cancel := context.WithCancel(ctx) + reqCtx, cancelReq := context.WithCancel(ctx) req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/coldrepo/snapshot.tar.zst", nil) - req = req.WithContext(cancelCtx) + req = req.WithContext(reqCtx) req.SetPathValue("host", "github.com") req.SetPathValue("path", "org/coldrepo/snapshot.tar.zst") w := httptest.NewRecorder() handler.ServeHTTP(w, req) - cancel() + cancelReq() // Cold path should serve the snapshot but without X-Cachew-Snapshot-Commit, // signaling to the client that it needs to freshen. @@ -633,15 +642,21 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) { } _, ctx := logging.Configure(context.Background(), logging.Config{}) + ctx, cancelAll := context.WithCancel(ctx) tmpDir := t.TempDir() mirrorRoot := filepath.Join(tmpDir, "mirrors") + sched, err := jobscheduler.New(ctx, jobscheduler.Config{}) + assert.NoError(t, err) + t.Cleanup(func() { cancelAll(); sched.Wait() }) + memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) mux := newTestMux() + schedProvider := func() (*jobscheduler.RootScheduler, error) { return sched, nil } cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil) - _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, schedProvider, memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) // Pre-populate cache with a fake snapshot. @@ -677,9 +692,6 @@ func TestDeferredRestoreOnlyScheduledOnce(t *testing.T) { handler.ServeHTTP(w2, req2) // Second request may be 200 (from local cache) or 503 (clone not ready). // The key assertion is that it doesn't panic from double-scheduling. - - // Allow background goroutines to settle. - time.Sleep(time.Second) } func TestSnapshotRemoteURLUsesServerURL(t *testing.T) {