From 704050ac1a4a652eab46efadbcfcddd8901b7432 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 2 Apr 2026 20:41:49 -0500 Subject: [PATCH] add Config.ReindexerIndexNames Some Pro features add high churn tables that would benefit from the reindexer. As of now, however, there's no way to extend it to run on anything but the default index list. Add `Config.ReindexerIndexNames` so callers can provide the exact list to reindex, and export `ReindexerIndexNamesDefault()` so integrations can start from the built-in targets without reaching into internal maintenance code. Thread the configured names into `maintenance.Reindexer` and filter them through `IndexesExist` before starting work. That lets mixed-version or partially migrated installs skip absent indexes instead of trying to rebuild objects that are not there. Preserve the `nil` versus non-`nil` contract in `WithDefaults` by copying the slice without collapsing an explicit empty override back to `nil`, so `[]string{}` still means "reindex nothing". When `IndexesExist` fails during reindex discovery, advance the next scheduled run before resetting the timer. The old code reset against the already-fired `nextRunAt`, which made `time.Until(nextRunAt)` zero or negative and caused immediate retries in a tight error loop. Scheduling from the prior run time preserves the configured cadence after transient discovery failures. The added tests cover the exported default list, exact override propagation, explicit empty overrides, missing-index filtering, and the discovery-error retry path. --- CHANGELOG.md | 4 + client.go | 51 ++++++++-- client_test.go | 96 +++++++++++++++++-- insert_opts.go | 2 +- internal/maintenance/reindexer.go | 78 +++++++++++---- internal/maintenance/reindexer_test.go | 127 +++++++++++++++++++++++-- 6 files changed, 311 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed2d4a97..6bb75499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194). + ### Fixed - Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184). diff --git a/client.go b/client.go index 6d810d89..28c0ab61 100644 --- a/client.go +++ b/client.go @@ -52,7 +52,19 @@ const ( QueueNumWorkersMax = 10_000 ) -var postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) +var ( + postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + + reindexerIndexNamesDefault = []string{ //nolint:gochecknoglobals + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + } +) // TestConfig contains configuration specific to test environments. type TestConfig struct { @@ -272,6 +284,11 @@ type Config struct { // reindexer will run at midnight UTC every day. ReindexerSchedule PeriodicSchedule + // ReindexerIndexNames customizes which indexes River periodically reindexes. + // If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided + // slice is used as the exact list. + ReindexerIndexNames []string + // ReindexerTimeout is the amount of time to wait for the reindexer to run a // single reindex operation before cancelling it via context. Set to -1 to // disable the timeout. @@ -374,12 +391,26 @@ type Config struct { schedulerInterval time.Duration } +// ReindexerIndexNamesDefault returns the default set of indexes reindexed by River. +func ReindexerIndexNamesDefault() []string { + indexNames := make([]string, len(reindexerIndexNamesDefault)) + copy(indexNames, reindexerIndexNamesDefault) + + return indexNames +} + // WithDefaults returns a copy of the Config with all default values applied. func (c *Config) WithDefaults() *Config { if c == nil { c = &Config{} } + reindexerIndexNames := ReindexerIndexNamesDefault() + if c.ReindexerIndexNames != nil { + reindexerIndexNames = make([]string, len(c.ReindexerIndexNames)) + copy(reindexerIndexNames, c.ReindexerIndexNames) + } + // Use the existing logger if set, otherwise create a default one. logger := c.Logger if logger == nil { @@ -420,6 +451,7 @@ func (c *Config) WithDefaults() *Config { PeriodicJobs: c.PeriodicJobs, PollOnly: c.PollOnly, Queues: c.Queues, + ReindexerIndexNames: reindexerIndexNames, ReindexerSchedule: c.ReindexerSchedule, ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault), RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter), @@ -603,14 +635,14 @@ type Client[TTx any] struct { notifier *notifier.Notifier // may be nil in poll-only mode periodicJobs *PeriodicJobBundle pilot riverpilot.Pilot - producersByQueueName map[string]*producer - queueMaintainer *maintenance.QueueMaintainer - queueMaintainerLeader *maintenance.QueueMaintainerLeader - queues *QueueBundle - services []startstop.Service - stopped <-chan struct{} - subscriptionManager *subscriptionManager - testSignals clientTestSignals + producersByQueueName map[string]*producer + queueMaintainer *maintenance.QueueMaintainer + queueMaintainerLeader *maintenance.QueueMaintainerLeader + queues *QueueBundle + services []startstop.Service + stopped <-chan struct{} + subscriptionManager *subscriptionManager + testSignals clientTestSignals // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. @@ -936,6 +968,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{ + IndexNames: config.ReindexerIndexNames, ScheduleFunc: scheduleFunc, Schema: config.Schema, Timeout: config.ReindexerTimeout, diff --git a/client_test.go b/client_test.go index 87d140c9..24a8be7f 100644 --- a/client_test.go +++ b/client_test.go @@ -7185,6 +7185,39 @@ func Test_Client_Start_Error(t *testing.T) { }) } +func Test_Config_WithDefaults(t *testing.T) { + t.Parallel() + + t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) { + t.Parallel() + + config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults() + + require.NotNil(t, config.ReindexerIndexNames) + require.Empty(t, config.ReindexerIndexNames) + }) + + t.Run("ReindexerIndexNamesNilGetsDefaults", func(t *testing.T) { + t.Parallel() + + config := (&Config{}).WithDefaults() + + require.Equal(t, ReindexerIndexNamesDefault(), config.ReindexerIndexNames) + }) + + t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) { + t.Parallel() + + input := []string{"custom_index", "other_index"} + config := (&Config{ReindexerIndexNames: input}).WithDefaults() + + require.Equal(t, input, config.ReindexerIndexNames) + + input[0] = "mutated" + require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames) + }) +} + func Test_NewClient_BaseServiceName(t *testing.T) { t.Parallel() @@ -7279,13 +7312,8 @@ func Test_NewClient_Defaults(t *testing.T) { require.False(t, enqueuer.StaggerStartupIsDisabled()) reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) - require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_kind") - require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey") - require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index") - require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx") + require.Equal(t, ReindexerIndexNamesDefault(), client.config.ReindexerIndexNames) + require.Equal(t, ReindexerIndexNamesDefault(), reindexer.Config.IndexNames) now := time.Now().UTC() nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1) require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now)) @@ -7349,6 +7377,7 @@ func Test_NewClient_Overrides(t *testing.T) { Logger: logger, MaxAttempts: 5, Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + ReindexerIndexNames: []string{"custom_index", "other_index"}, ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour}, ReindexerTimeout: 125 * time.Millisecond, RetryPolicy: retryPolicy, @@ -7373,6 +7402,8 @@ func Test_NewClient_Overrides(t *testing.T) { require.True(t, enqueuer.StaggerStartupIsDisabled()) reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames) now := time.Now().UTC() require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now)) @@ -7391,6 +7422,37 @@ func Test_NewClient_Overrides(t *testing.T) { require.Len(t, client.config.WorkerMiddleware, 1) } +func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + + workers := NewWorkers() + AddWorker(workers, &noOpWorker{}) + + client, err := NewClient(driver, &Config{ + Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + ReindexerIndexNames: []string{}, + Schema: schema, + TestOnly: true, + Workers: workers, + }) + require.NoError(t, err) + + require.NotNil(t, client.config.ReindexerIndexNames) + require.Empty(t, client.config.ReindexerIndexNames) + + reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer) + require.NotNil(t, reindexer.Config.IndexNames) + require.Empty(t, reindexer.Config.IndexNames) +} + func Test_NewClient_MissingParameters(t *testing.T) { t.Parallel() @@ -7817,6 +7879,26 @@ func Test_NewClient_Validations(t *testing.T) { } } +func TestReindexerIndexNamesDefault(t *testing.T) { + t.Parallel() + + indexNames := ReindexerIndexNamesDefault() + + // Assert the exact list so index list changes require explicit test updates. + require.Equal(t, []string{ + "river_job_args_index", + "river_job_kind", + "river_job_metadata_index", + "river_job_pkey", + "river_job_prioritized_fetching_index", + "river_job_state_and_finalized_at_index", + "river_job_unique_idx", + }, indexNames) + + indexNames[0] = "mutated" + require.Equal(t, "river_job_args_index", ReindexerIndexNamesDefault()[0]) +} + type timeoutTestArgs struct { TimeoutValue time.Duration `json:"timeout_value"` } diff --git a/insert_opts.go b/insert_opts.go index 032e235e..bc1443d8 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals // what to do about the job that can't be scheduled. We can't send feedback to // the caller at this point, so probably the best we could do is leave it in // this untransitionable state until the `running` job finished, which isn't -// particularly satsifactory. +// particularly satisfactory. var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals rivertype.JobStateAvailable, rivertype.JobStatePending, diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 7ff4bde9..6eb09955 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -29,16 +29,6 @@ const ( ReindexerTimeoutDefault = 1 * time.Minute ) -var defaultIndexNames = []string{ //nolint:gochecknoglobals - "river_job_args_index", - "river_job_kind", - "river_job_metadata_index", - "river_job_pkey", - "river_job_prioritized_fetching_index", - "river_job_state_and_finalized_at_index", - "river_job_unique_idx", -} - // ReindexerTestSignals are internal signals used exclusively in tests. type ReindexerTestSignals struct { Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes @@ -49,7 +39,8 @@ func (ts *ReindexerTestSignals) Init(tb testutil.TestingTB) { } type ReindexerConfig struct { - // IndexNames is a list of indexes to reindex on each run. + // IndexNames is the exact list of indexes to reindex on each run. It must + // be non-nil. An empty slice disables reindex work. IndexNames []string // ScheduleFunc returns the next scheduled run time for the reindexer given the @@ -66,6 +57,9 @@ type ReindexerConfig struct { } func (c *ReindexerConfig) mustValidate() *ReindexerConfig { + if c.IndexNames == nil { + panic("ReindexerConfig.IndexNames must be set") + } if c.ScheduleFunc == nil { panic("ReindexerConfig.ScheduleFunc must be set") } @@ -91,11 +85,13 @@ type Reindexer struct { } func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer { - indexNames := defaultIndexNames - if config.IndexNames != nil { - indexNames = config.IndexNames + if config.IndexNames == nil { + panic("ReindexerConfig.IndexNames must be set") } + indexNames := make([]string, len(config.IndexNames)) + copy(indexNames, config.IndexNames) + scheduleFunc := config.ScheduleFunc if scheduleFunc == nil { scheduleFunc = (&DefaultReindexerSchedule{}).Next @@ -133,11 +129,27 @@ func (s *Reindexer) Start(ctx context.Context) error { s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt)) timerUntilNextRun := time.NewTimer(time.Until(nextRunAt)) + scheduleNextRun := func() { + // Advance from the previous scheduled time, not "now", so retries + // stay aligned with the configured cadence and don't immediately + // refire after a timer that has already elapsed. + nextRunAt = s.Config.ScheduleFunc(nextRunAt) + timerUntilNextRun.Reset(time.Until(nextRunAt)) + } for { select { case <-timerUntilNextRun.C: - for _, indexName := range s.Config.IndexNames { + reindexableIndexNames, err := s.reindexableIndexNames(ctx) + if err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error())) + } + scheduleNextRun() + continue + } + + for _, indexName := range reindexableIndexNames { if _, err := s.reindexOne(ctx, indexName); err != nil { if !errors.Is(err, context.Canceled) { s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName)) @@ -151,15 +163,11 @@ func (s *Reindexer) Start(ctx context.Context) error { // On each run, we calculate the new schedule based on the // previous run's start time. This ensures that we don't // accidentally skip a run as time elapses during the run. - nextRunAt = s.Config.ScheduleFunc(nextRunAt) + scheduleNextRun() // TODO: maybe we should log differently if some of these fail? s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully, - slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames))) - - // Reset the timer after the insert loop has finished so it's - // paused during work. Makes its firing more deterministic. - timerUntilNextRun.Reset(time.Until(nextRunAt)) + slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames))) case <-ctx.Done(): // Clean up timer resources. We know it has _not_ received from @@ -176,6 +184,34 @@ func (s *Reindexer) Start(ctx context.Context) error { return nil } +func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) { + indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{ + IndexNames: s.Config.IndexNames, + Schema: s.Config.Schema, + }) + if err != nil { + return nil, err + } + + indexNames := make([]string, 0, len(s.Config.IndexNames)) + missingIndexNames := make([]string, 0) + for _, indexName := range s.Config.IndexNames { + if indexesExist[indexName] { + indexNames = append(indexNames, indexName) + continue + } + + missingIndexNames = append(missingIndexNames, indexName) + } + + if len(missingIndexNames) > 0 { + s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames", + slog.Any("index_names", missingIndexNames)) + } + + return indexNames, nil +} + func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) { var cancel func() if s.Config.Timeout > -1 { diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 04f28cf3..a78dbd73 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -2,7 +2,9 @@ package maintenance import ( "context" + "errors" "fmt" + "sync/atomic" "testing" "time" @@ -16,15 +18,43 @@ import ( "github.com/riverqueue/river/rivershared/startstoptest" ) +type reindexerExecutorMock struct { + riverdriver.Executor + + indexesExistCalls atomic.Int32 + indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) + indexesExistSignal chan struct{} +} + +func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock { + return &reindexerExecutorMock{ + Executor: exec, + indexesExistFunc: exec.IndexesExist, + indexesExistSignal: make(chan struct{}, 10), + } +} + +func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { + m.indexesExistCalls.Add(1) + + select { + case m.indexesExistSignal <- struct{}{}: + default: + } + + return m.indexesExistFunc(ctx, params) +} + func TestReindexer(t *testing.T) { t.Parallel() ctx := context.Background() type testBundle struct { - exec riverdriver.Executor - now time.Time - schema string + exec riverdriver.Executor + indexNames []string + now time.Time + schema string } setup := func(t *testing.T) (*Reindexer, *testBundle) { @@ -37,8 +67,9 @@ func TestReindexer(t *testing.T) { ) bundle := &testBundle{ - exec: riverpgxv5.New(dbPool).GetExecutor(), - schema: schema, + exec: riverpgxv5.New(dbPool).GetExecutor(), + indexNames: []string{"river_job_kind", "river_job_prioritized_fetching_index", "river_job_state_and_finalized_at_index"}, + schema: schema, } archetype := riversharedtest.BaseServiceArchetype(t) @@ -51,6 +82,7 @@ func TestReindexer(t *testing.T) { } svc := NewReindexer(archetype, &ReindexerConfig{ + IndexNames: bundle.indexNames, ScheduleFunc: fromNow(500 * time.Millisecond), Schema: schema, }, bundle.exec) @@ -67,6 +99,8 @@ func TestReindexer(t *testing.T) { if alreadyRan { return t.Add(time.Hour) } + // Force the first run immediately, then make the next legitimate + // schedule far enough away that an immediate retry is clearly wrong. alreadyRan = true return t.Add(time.Millisecond) } @@ -127,6 +161,22 @@ func TestReindexer(t *testing.T) { require.True(t, requireReindexOne(indexName)) }) + t.Run("ReindexableIndexNamesSkipsMissingIndexes", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t) + + svc.Config.IndexNames = []string{ + "does_not_exist", + "river_job_kind", + "river_job_prioritized_fetching_index", + } + + indexNames, err := svc.reindexableIndexNames(ctx) + require.NoError(t, err) + require.Equal(t, []string{"river_job_kind", "river_job_prioritized_fetching_index"}, indexNames) + }) + t.Run("ReindexesMinimalSubsetofIndexes", func(t *testing.T) { t.Parallel() @@ -163,7 +213,7 @@ func TestReindexer(t *testing.T) { } }) - t.Run("ReindexesDefaultIndexes", func(t *testing.T) { + t.Run("ReindexesConfiguredIndexes", func(t *testing.T) { t.Parallel() svc, _ := setup(t) @@ -243,16 +293,75 @@ func TestReindexer(t *testing.T) { riversharedtest.WaitOrTimeout(t, stopped) }) - t.Run("DefaultConfigs", func(t *testing.T) { + t.Run("CopiesConfiguredIndexNamesAndAppliesOtherDefaults", func(t *testing.T) { t.Parallel() svc, bundle := setup(t) - svc = NewReindexer(&svc.Archetype, &ReindexerConfig{}, bundle.exec) + input := []string{"river_job_kind"} + svc = NewReindexer(&svc.Archetype, &ReindexerConfig{IndexNames: input}, bundle.exec) - require.Equal(t, defaultIndexNames, svc.Config.IndexNames) + require.Equal(t, input, svc.Config.IndexNames) + input[0] = "mutated" + require.Equal(t, []string{"river_job_kind"}, svc.Config.IndexNames) require.Equal(t, ReindexerTimeoutDefault, svc.Config.Timeout) require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&DefaultReindexerSchedule{}).Next(bundle.now)) }) + + t.Run("PanicsOnNilIndexNames", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + require.PanicsWithValue(t, "ReindexerConfig.IndexNames must be set", func() { + NewReindexer(&svc.Archetype, &ReindexerConfig{}, bundle.exec) + }) + }) +} + +func TestReindexer_DiscoveryErrorSchedulesNextRun(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + + runImmediatelyThenOnceAnHour := func() func(time.Time) time.Time { + alreadyRan := false + return func(t time.Time) time.Time { + if alreadyRan { + return t.Add(time.Hour) + } + alreadyRan = true + return t.Add(time.Millisecond) + } + } + + execMock := newReindexerExecutorMock(driver.GetExecutor()) + execMock.indexesExistFunc = func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { + return nil, errors.New("indexes exist failed") + } + + svc := NewReindexer(riversharedtest.BaseServiceArchetype(t), &ReindexerConfig{ + IndexNames: []string{"river_job_kind"}, + ScheduleFunc: runImmediatelyThenOnceAnHour(), + Schema: schema, + }, execMock) + svc.Logger = riversharedtest.LoggerWarn(t) + svc.StaggerStartupDisable(true) + t.Cleanup(svc.Stop) + + require.NoError(t, svc.Start(ctx)) + riversharedtest.WaitOrTimeout(t, execMock.indexesExistSignal) + + select { + case <-execMock.indexesExistSignal: + require.FailNowf(t, "unexpected immediate retry", "IndexesExist was called %d times", execMock.indexesExistCalls.Load()) + case <-time.After(100 * time.Millisecond): + } } func TestDefaultReindexerSchedule(t *testing.T) {