diff --git a/CHANGELOG.md b/CHANGELOG.md index ed2d4a97..6bb75499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194). + ### Fixed - Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184). diff --git a/client.go b/client.go index 6d810d89..3cc3cf6f 100644 --- a/client.go +++ b/client.go @@ -272,6 +272,11 @@ type Config struct { // reindexer will run at midnight UTC every day. ReindexerSchedule PeriodicSchedule + // ReindexerIndexNames customizes which indexes River periodically reindexes. + // If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided + // slice is used as the exact list. + ReindexerIndexNames []string + // ReindexerTimeout is the amount of time to wait for the reindexer to run a // single reindex operation before cancelling it via context. Set to -1 to // disable the timeout. @@ -380,6 +385,12 @@ func (c *Config) WithDefaults() *Config { c = &Config{} } + var reindexerIndexNames []string + if c.ReindexerIndexNames != nil { + reindexerIndexNames = make([]string, len(c.ReindexerIndexNames)) + copy(reindexerIndexNames, c.ReindexerIndexNames) + } + // Use the existing logger if set, otherwise create a default one. logger := c.Logger if logger == nil { @@ -420,6 +431,7 @@ func (c *Config) WithDefaults() *Config { PeriodicJobs: c.PeriodicJobs, PollOnly: c.PollOnly, Queues: c.Queues, + ReindexerIndexNames: reindexerIndexNames, ReindexerSchedule: c.ReindexerSchedule, ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault), RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter), @@ -936,6 +948,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{ + IndexNames: config.ReindexerIndexNames, ScheduleFunc: scheduleFunc, Schema: config.Schema, Timeout: config.ReindexerTimeout, diff --git a/client_test.go b/client_test.go index 87d140c9..1c40cd6f 100644 --- a/client_test.go +++ b/client_test.go @@ -7185,6 +7185,39 @@ func Test_Client_Start_Error(t *testing.T) { }) } +func Test_Config_WithDefaults(t *testing.T) { + t.Parallel() + + t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) { + t.Parallel() + + config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults() + + require.NotNil(t, config.ReindexerIndexNames) + require.Empty(t, config.ReindexerIndexNames) + }) + + t.Run("ReindexerIndexNamesNilStaysNil", func(t *testing.T) { + t.Parallel() + + config := (&Config{}).WithDefaults() + + require.Nil(t, config.ReindexerIndexNames) + }) + + t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) { + t.Parallel() + + input := []string{"custom_index", "other_index"} + config := (&Config{ReindexerIndexNames: input}).WithDefaults() + + require.Equal(t, input, config.ReindexerIndexNames) + + input[0] = "mutated" + require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames) + }) +} + func Test_NewClient_BaseServiceName(t *testing.T) { t.Parallel() @@ -7279,13 +7312,16 @@ func Test_NewClient_Defaults(t *testing.T) { require.False(t, enqueuer.StaggerStartupIsDisabled()) reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) - require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_kind") - require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey") - require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx") + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, reindexer.Config.IndexNames) now := time.Now().UTC() nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1) require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now)) @@ -7349,6 +7385,7 @@ func Test_NewClient_Overrides(t *testing.T) { Logger: logger, MaxAttempts: 5, Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + ReindexerIndexNames: []string{"custom_index", "other_index"}, ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour}, ReindexerTimeout: 125 * time.Millisecond, RetryPolicy: retryPolicy, @@ -7373,6 +7410,8 @@ func Test_NewClient_Overrides(t *testing.T) { require.True(t, enqueuer.StaggerStartupIsDisabled()) reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames) now := time.Now().UTC() require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now)) @@ -7391,6 +7430,37 @@ func Test_NewClient_Overrides(t *testing.T) { require.Len(t, client.config.WorkerMiddleware, 1) } +func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + + workers := NewWorkers() + AddWorker(workers, &noOpWorker{}) + + client, err := NewClient(driver, &Config{ + Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + ReindexerIndexNames: []string{}, + Schema: schema, + TestOnly: true, + Workers: workers, + }) + require.NoError(t, err) + + require.NotNil(t, client.config.ReindexerIndexNames) + require.Empty(t, client.config.ReindexerIndexNames) + + reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) + require.NotNil(t, reindexer.Config.IndexNames) + require.Empty(t, reindexer.Config.IndexNames) +} + func Test_NewClient_MissingParameters(t *testing.T) { t.Parallel() diff --git a/insert_opts.go b/insert_opts.go index 032e235e..bc1443d8 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals // what to do about the job that can't be scheduled. We can't send feedback to // the caller at this point, so probably the best we could do is leave it in // this untransitionable state until the `running` job finished, which isn't -// particularly satsifactory. +// particularly satisfactory. var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals rivertype.JobStateAvailable, rivertype.JobStatePending, diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 7ff4bde9..6fa506ea 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -39,6 +39,11 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals "river_job_unique_idx", } +// DefaultReindexerIndexNames returns the default set of indexes reindexed by River. +func DefaultReindexerIndexNames() []string { + return append([]string(nil), defaultIndexNames...) +} + // ReindexerTestSignals are internal signals used exclusively in tests. type ReindexerTestSignals struct { Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes @@ -133,11 +138,27 @@ func (s *Reindexer) Start(ctx context.Context) error { s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt)) timerUntilNextRun := time.NewTimer(time.Until(nextRunAt)) + scheduleNextRun := func() { + // Advance from the previous scheduled time, not "now", so retries + // stay aligned with the configured cadence and don't immediately + // refire after a timer that has already elapsed. + nextRunAt = s.Config.ScheduleFunc(nextRunAt) + timerUntilNextRun.Reset(time.Until(nextRunAt)) + } for { select { case <-timerUntilNextRun.C: - for _, indexName := range s.Config.IndexNames { + reindexableIndexNames, err := s.reindexableIndexNames(ctx) + if err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error())) + } + scheduleNextRun() + continue + } + + for _, indexName := range reindexableIndexNames { if _, err := s.reindexOne(ctx, indexName); err != nil { if !errors.Is(err, context.Canceled) { s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName)) @@ -151,15 +172,11 @@ func (s *Reindexer) Start(ctx context.Context) error { // On each run, we calculate the new schedule based on the // previous run's start time. This ensures that we don't // accidentally skip a run as time elapses during the run. - nextRunAt = s.Config.ScheduleFunc(nextRunAt) + scheduleNextRun() // TODO: maybe we should log differently if some of these fail? s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully, - slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames))) - - // Reset the timer after the insert loop has finished so it's - // paused during work. Makes its firing more deterministic. - timerUntilNextRun.Reset(time.Until(nextRunAt)) + slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames))) case <-ctx.Done(): // Clean up timer resources. We know it has _not_ received from @@ -176,6 +193,34 @@ func (s *Reindexer) Start(ctx context.Context) error { return nil } +func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) { + indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{ + IndexNames: s.Config.IndexNames, + Schema: s.Config.Schema, + }) + if err != nil { + return nil, err + } + + indexNames := make([]string, 0, len(s.Config.IndexNames)) + missingIndexNames := make([]string, 0) + for _, indexName := range s.Config.IndexNames { + if indexesExist[indexName] { + indexNames = append(indexNames, indexName) + continue + } + + missingIndexNames = append(missingIndexNames, indexName) + } + + if len(missingIndexNames) > 0 { + s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames", + slog.Any("index_names", missingIndexNames)) + } + + return indexNames, nil +} + func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) { var cancel func() if s.Config.Timeout > -1 { diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 04f28cf3..58e4ff62 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -2,7 +2,9 @@ package maintenance import ( "context" + "errors" "fmt" + "sync/atomic" "testing" "time" @@ -16,6 +18,33 @@ import ( "github.com/riverqueue/river/rivershared/startstoptest" ) +type reindexerExecutorMock struct { + riverdriver.Executor + + indexesExistCalls atomic.Int32 + indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) + indexesExistSignal chan struct{} +} + +func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock { + return &reindexerExecutorMock{ + Executor: exec, + indexesExistFunc: exec.IndexesExist, + indexesExistSignal: make(chan struct{}, 10), + } +} + +func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { + m.indexesExistCalls.Add(1) + + select { + case m.indexesExistSignal <- struct{}{}: + default: + } + + return m.indexesExistFunc(ctx, params) +} + func TestReindexer(t *testing.T) { t.Parallel() @@ -67,6 +96,8 @@ func TestReindexer(t *testing.T) { if alreadyRan { return t.Add(time.Hour) } + // Force the first run immediately, then make the next legitimate + // schedule far enough away that an immediate retry is clearly wrong. alreadyRan = true return t.Add(time.Millisecond) } @@ -127,6 +158,22 @@ func TestReindexer(t *testing.T) { require.True(t, requireReindexOne(indexName)) }) + t.Run("ReindexableIndexNamesSkipsMissingIndexes", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t) + + svc.Config.IndexNames = []string{ + "does_not_exist", + "river_job_kind", + "river_job_prioritized_fetching_index", + } + + indexNames, err := svc.reindexableIndexNames(ctx) + require.NoError(t, err) + require.Equal(t, []string{"river_job_kind", "river_job_prioritized_fetching_index"}, indexNames) + }) + t.Run("ReindexesMinimalSubsetofIndexes", func(t *testing.T) { t.Parallel() @@ -249,12 +296,66 @@ func TestReindexer(t *testing.T) { svc, bundle := setup(t) svc = NewReindexer(&svc.Archetype, &ReindexerConfig{}, bundle.exec) - require.Equal(t, defaultIndexNames, svc.Config.IndexNames) + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, svc.Config.IndexNames) require.Equal(t, ReindexerTimeoutDefault, svc.Config.Timeout) require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&DefaultReindexerSchedule{}).Next(bundle.now)) }) } +func TestReindexer_DiscoveryErrorSchedulesNextRun(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + + runImmediatelyThenOnceAnHour := func() func(time.Time) time.Time { + alreadyRan := false + return func(t time.Time) time.Time { + if alreadyRan { + return t.Add(time.Hour) + } + alreadyRan = true + return t.Add(time.Millisecond) + } + } + + execMock := newReindexerExecutorMock(driver.GetExecutor()) + execMock.indexesExistFunc = func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { + return nil, errors.New("indexes exist failed") + } + + svc := NewReindexer(riversharedtest.BaseServiceArchetype(t), &ReindexerConfig{ + ScheduleFunc: runImmediatelyThenOnceAnHour(), + Schema: schema, + }, execMock) + svc.Logger = riversharedtest.LoggerWarn(t) + svc.StaggerStartupDisable(true) + t.Cleanup(svc.Stop) + + require.NoError(t, svc.Start(ctx)) + riversharedtest.WaitOrTimeout(t, execMock.indexesExistSignal) + + select { + case <-execMock.indexesExistSignal: + require.FailNowf(t, "unexpected immediate retry", "IndexesExist was called %d times", execMock.indexesExistCalls.Load()) + case <-time.After(100 * time.Millisecond): + } +} + func TestDefaultReindexerSchedule(t *testing.T) { t.Parallel() diff --git a/reindexer.go b/reindexer.go new file mode 100644 index 00000000..47537b45 --- /dev/null +++ b/reindexer.go @@ -0,0 +1,8 @@ +package river + +import "github.com/riverqueue/river/internal/maintenance" + +// ReindexerIndexNamesDefault returns the default set of indexes reindexed by River. +func ReindexerIndexNamesDefault() []string { + return maintenance.DefaultReindexerIndexNames() +} diff --git a/reindexer_test.go b/reindexer_test.go new file mode 100644 index 00000000..edefa965 --- /dev/null +++ b/reindexer_test.go @@ -0,0 +1,27 @@ +package river + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReindexerIndexNamesDefault(t *testing.T) { + t.Parallel() + + indexNames := ReindexerIndexNamesDefault() + + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, indexNames) + + indexNames[0] = "mutated" + require.Equal(t, "river_job_args_index", ReindexerIndexNamesDefault()[0]) +}