Skip to content

Commit b5767f9

Browse files
committed
add Config.ReindexerIndexNames
Some Pro features add high churn tables that would benefit from the reindexer. As of now, however, there's no way to extend it to run on anything but the default index list. Add `Config.ReindexerIndexNames` so callers can provide the exact list to reindex, and export `ReindexerIndexNamesDefault()` so integrations can start from the built-in targets without reaching into internal maintenance code. Thread the configured names into `maintenance.Reindexer` and filter them through `IndexesExist` before starting work. That lets mixed-version or partially migrated installs skip absent indexes instead of trying to rebuild objects that are not there. Preserve the `nil` versus non-`nil` contract in `WithDefaults` by copying the slice without collapsing an explicit empty override back to `nil`, so `[]string{}` still means "reindex nothing". When `IndexesExist` fails during reindex discovery, advance the next scheduled run before resetting the timer. The old code reset against the already-fired `nextRunAt`, which made `time.Until(nextRunAt)` zero or negative and caused immediate retries in a tight error loop. Scheduling from the prior run time preserves the configured cadence after transient discovery failures. The added tests cover the exported default list, exact override propagation, explicit empty overrides, missing-index filtering, and the discovery-error retry path.
1 parent b1f225e commit b5767f9

8 files changed

Lines changed: 284 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194).
13+
1014
### Changed
1115

1216
- Jobs erroring or panicking no longer logs at the error/warn level because this is not indicative of a problem inside of River itself. These log statements have been demoted to info. [PR #1190](https://github.com/riverqueue/river/pull/1190).

client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ type Config struct {
273273
// reindexer will run at midnight UTC every day.
274274
ReindexerSchedule PeriodicSchedule
275275

276+
// ReindexerIndexNames customizes which indexes River periodically reindexes.
277+
// If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided
278+
// slice is used as the exact list.
279+
ReindexerIndexNames []string
280+
276281
// ReindexerTimeout is the amount of time to wait for the reindexer to run a
277282
// single reindex operation before cancelling it via context. Set to -1 to
278283
// disable the timeout.
@@ -381,6 +386,12 @@ func (c *Config) WithDefaults() *Config {
381386
c = &Config{}
382387
}
383388

389+
var reindexerIndexNames []string
390+
if c.ReindexerIndexNames != nil {
391+
reindexerIndexNames = make([]string, len(c.ReindexerIndexNames))
392+
copy(reindexerIndexNames, c.ReindexerIndexNames)
393+
}
394+
384395
// Use the existing logger if set, otherwise create a default one.
385396
logger := c.Logger
386397
if logger == nil {
@@ -421,6 +432,7 @@ func (c *Config) WithDefaults() *Config {
421432
PeriodicJobs: c.PeriodicJobs,
422433
PollOnly: c.PollOnly,
423434
Queues: c.Queues,
435+
ReindexerIndexNames: reindexerIndexNames,
424436
ReindexerSchedule: c.ReindexerSchedule,
425437
ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault),
426438
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
@@ -939,6 +951,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
939951
}
940952

941953
reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
954+
IndexNames: config.ReindexerIndexNames,
942955
ScheduleFunc: scheduleFunc,
943956
Schema: config.Schema,
944957
Timeout: config.ReindexerTimeout,

client_test.go

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7160,6 +7160,39 @@ func Test_Client_Start_Error(t *testing.T) {
71607160
})
71617161
}
71627162

7163+
func Test_Config_WithDefaults(t *testing.T) {
7164+
t.Parallel()
7165+
7166+
t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) {
7167+
t.Parallel()
7168+
7169+
config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults()
7170+
7171+
require.NotNil(t, config.ReindexerIndexNames)
7172+
require.Empty(t, config.ReindexerIndexNames)
7173+
})
7174+
7175+
t.Run("ReindexerIndexNamesNilStaysNil", func(t *testing.T) {
7176+
t.Parallel()
7177+
7178+
config := (&Config{}).WithDefaults()
7179+
7180+
require.Nil(t, config.ReindexerIndexNames)
7181+
})
7182+
7183+
t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) {
7184+
t.Parallel()
7185+
7186+
input := []string{"custom_index", "other_index"}
7187+
config := (&Config{ReindexerIndexNames: input}).WithDefaults()
7188+
7189+
require.Equal(t, input, config.ReindexerIndexNames)
7190+
7191+
input[0] = "mutated"
7192+
require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames)
7193+
})
7194+
}
7195+
71637196
func Test_NewClient_BaseServiceName(t *testing.T) {
71647197
t.Parallel()
71657198

@@ -7254,13 +7287,16 @@ func Test_NewClient_Defaults(t *testing.T) {
72547287
require.False(t, enqueuer.StaggerStartupIsDisabled())
72557288

72567289
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7257-
require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index")
7258-
require.Contains(t, reindexer.Config.IndexNames, "river_job_kind")
7259-
require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index")
7260-
require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey")
7261-
require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index")
7262-
require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index")
7263-
require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx")
7290+
// Assert the exact list so index list changes require explicit test updates.
7291+
require.Equal(t, []string{
7292+
"river_job_args_index",
7293+
"river_job_kind",
7294+
"river_job_metadata_index",
7295+
"river_job_pkey",
7296+
"river_job_prioritized_fetching_index",
7297+
"river_job_state_and_finalized_at_index",
7298+
"river_job_unique_idx",
7299+
}, reindexer.Config.IndexNames)
72647300
now := time.Now().UTC()
72657301
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
72667302
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))
@@ -7324,6 +7360,7 @@ func Test_NewClient_Overrides(t *testing.T) {
73247360
Logger: logger,
73257361
MaxAttempts: 5,
73267362
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
7363+
ReindexerIndexNames: []string{"custom_index", "other_index"},
73277364
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
73287365
ReindexerTimeout: 125 * time.Millisecond,
73297366
RetryPolicy: retryPolicy,
@@ -7348,6 +7385,8 @@ func Test_NewClient_Overrides(t *testing.T) {
73487385
require.True(t, enqueuer.StaggerStartupIsDisabled())
73497386

73507387
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7388+
// Assert the exact list so index list changes require explicit test updates.
7389+
require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames)
73517390
now := time.Now().UTC()
73527391
require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now))
73537392

@@ -7366,6 +7405,37 @@ func Test_NewClient_Overrides(t *testing.T) {
73667405
require.Len(t, client.config.WorkerMiddleware, 1)
73677406
}
73687407

7408+
func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) {
7409+
t.Parallel()
7410+
7411+
ctx := context.Background()
7412+
7413+
var (
7414+
dbPool = riversharedtest.DBPool(ctx, t)
7415+
driver = riverpgxv5.New(dbPool)
7416+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
7417+
)
7418+
7419+
workers := NewWorkers()
7420+
AddWorker(workers, &noOpWorker{})
7421+
7422+
client, err := NewClient(driver, &Config{
7423+
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
7424+
ReindexerIndexNames: []string{},
7425+
Schema: schema,
7426+
TestOnly: true,
7427+
Workers: workers,
7428+
})
7429+
require.NoError(t, err)
7430+
7431+
require.NotNil(t, client.config.ReindexerIndexNames)
7432+
require.Empty(t, client.config.ReindexerIndexNames)
7433+
7434+
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7435+
require.NotNil(t, reindexer.Config.IndexNames)
7436+
require.Empty(t, reindexer.Config.IndexNames)
7437+
}
7438+
73697439
func Test_NewClient_MissingParameters(t *testing.T) {
73707440
t.Parallel()
73717441

insert_opts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
240240
// what to do about the job that can't be scheduled. We can't send feedback to
241241
// the caller at this point, so probably the best we could do is leave it in
242242
// this untransitionable state until the `running` job finished, which isn't
243-
// particularly satsifactory.
243+
// particularly satisfactory.
244244
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
245245
rivertype.JobStateAvailable,
246246
rivertype.JobStatePending,

internal/maintenance/reindexer.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals
3939
"river_job_unique_idx",
4040
}
4141

42+
// DefaultReindexerIndexNames returns the default set of indexes reindexed by River.
43+
func DefaultReindexerIndexNames() []string {
44+
return append([]string(nil), defaultIndexNames...)
45+
}
46+
4247
// ReindexerTestSignals are internal signals used exclusively in tests.
4348
type ReindexerTestSignals struct {
4449
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
@@ -133,11 +138,27 @@ func (s *Reindexer) Start(ctx context.Context) error {
133138
s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt))
134139

135140
timerUntilNextRun := time.NewTimer(time.Until(nextRunAt))
141+
scheduleNextRun := func() {
142+
// Advance from the previous scheduled time, not "now", so retries
143+
// stay aligned with the configured cadence and don't immediately
144+
// refire after a timer that has already elapsed.
145+
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
146+
timerUntilNextRun.Reset(time.Until(nextRunAt))
147+
}
136148

137149
for {
138150
select {
139151
case <-timerUntilNextRun.C:
140-
for _, indexName := range s.Config.IndexNames {
152+
reindexableIndexNames, err := s.reindexableIndexNames(ctx)
153+
if err != nil {
154+
if !errors.Is(err, context.Canceled) {
155+
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error()))
156+
}
157+
scheduleNextRun()
158+
continue
159+
}
160+
161+
for _, indexName := range reindexableIndexNames {
141162
if _, err := s.reindexOne(ctx, indexName); err != nil {
142163
if !errors.Is(err, context.Canceled) {
143164
s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName))
@@ -151,15 +172,11 @@ func (s *Reindexer) Start(ctx context.Context) error {
151172
// On each run, we calculate the new schedule based on the
152173
// previous run's start time. This ensures that we don't
153174
// accidentally skip a run as time elapses during the run.
154-
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
175+
scheduleNextRun()
155176

156177
// TODO: maybe we should log differently if some of these fail?
157178
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
158-
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames)))
159-
160-
// Reset the timer after the insert loop has finished so it's
161-
// paused during work. Makes its firing more deterministic.
162-
timerUntilNextRun.Reset(time.Until(nextRunAt))
179+
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames)))
163180

164181
case <-ctx.Done():
165182
// Clean up timer resources. We know it has _not_ received from
@@ -176,6 +193,34 @@ func (s *Reindexer) Start(ctx context.Context) error {
176193
return nil
177194
}
178195

196+
func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) {
197+
indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{
198+
IndexNames: s.Config.IndexNames,
199+
Schema: s.Config.Schema,
200+
})
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
indexNames := make([]string, 0, len(s.Config.IndexNames))
206+
missingIndexNames := make([]string, 0)
207+
for _, indexName := range s.Config.IndexNames {
208+
if indexesExist[indexName] {
209+
indexNames = append(indexNames, indexName)
210+
continue
211+
}
212+
213+
missingIndexNames = append(missingIndexNames, indexName)
214+
}
215+
216+
if len(missingIndexNames) > 0 {
217+
s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames",
218+
slog.Any("index_names", missingIndexNames))
219+
}
220+
221+
return indexNames, nil
222+
}
223+
179224
func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) {
180225
var cancel func()
181226
if s.Config.Timeout > -1 {

0 commit comments

Comments
 (0)