From 8a40c082c32324b9e1a4c9867c00596fd69bcb1b Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 27 May 2026 06:46:11 +0200 Subject: [PATCH] Add options to migrator `Validate` functions + option for `TargetVersion` This one in response to #1257. It's useful to validate the current state of the database's migrations, but you may not be fully on the latest version and could get an error. The included example is for Pro v0.24.0 which includes two migrations: one to be rolled out before deploy and one to be rolled out after. `Validate` would fail until the post-deploy migrations were run, which isn't flexible enough. Here, have `Validate` functions take a `ValidateOpts` similar to the `MigrateOpts` taken by the `Migrate` functions. `ValidateOpts` includes `TargetVersion`, which lets you specify a maximum version bound. Notably, this is a breaking change in that we add a new parameter to `Validate`. I'm a little on the fence about this, but doing it via breaking (1) avoids an extra pair of functions that pollute the API and need testing, (2) should be *mostly* low impact because I doubt all that many people are using `Validate`, and (3) the fix is super easy in that you just need to add a `nil` in the last parameter place if you don't want to use any new options. Fixes #1257. --- CHANGELOG.md | 6 +++ cmd/river/rivercli/driver_procurer.go | 2 +- cmd/river/rivercli/river_cli.go | 2 +- cmd/river/rivercli/river_cli_test.go | 6 +-- rivermigrate/river_migrate.go | 34 +++++++++++++--- rivermigrate/river_migrate_test.go | 58 ++++++++++++++++++++++++++- 6 files changed, 96 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b319a40..01e43314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) + +### Added + +- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/cmd/river/rivercli/driver_procurer.go b/cmd/river/rivercli/driver_procurer.go index 40fc6ec9..29e4edce 100644 --- a/cmd/river/rivercli/driver_procurer.go +++ b/cmd/river/rivercli/driver_procurer.go @@ -42,7 +42,7 @@ type MigratorInterface interface { ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) GetVersion(version int) (rivermigrate.Migration, error) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) - Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) + Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) } type pgxV5DriverProcurer struct { diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index 0b0dedc4..fc1e55c8 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -708,7 +708,7 @@ func (c *validate) Run(ctx context.Context, opts *validateOpts) (bool, error) { return false, err } - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) if err != nil { return false, err } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 65e1134d..95ee4fe5 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -68,7 +68,7 @@ type MigratorStub struct { existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error) getVersionStub func(version int) (rivermigrate.Migration, error) migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) - validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error) + validateStub func(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) } func (m *MigratorStub) AllVersions() []rivermigrate.Migration { @@ -103,12 +103,12 @@ func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direc return m.migrateStub(ctx, direction, opts) } -func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { +func (m *MigratorStub) Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) { if m.validateStub == nil { panic("Validate is not stubbed") } - return m.validateStub(ctx) + return m.validateStub(ctx, opts) } var ( diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 12568e20..9b4db9a8 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -365,12 +365,20 @@ type ValidateResult struct { OK bool } +// ValidateOpts are options for a validate operation. +type ValidateOpts struct { + // TargetVersion is a specific migration version to validate up to. The + // version must exist. When set, validation only checks that migrations up + // to and including TargetVersion have been applied. + TargetVersion int +} + // Validate validates the current state of migrations, returning an unsuccessful // validation and usable message in case there are migrations that haven't yet // been applied. -func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) { +func (m *Migrator[TTx]) Validate(ctx context.Context, opts *ValidateOpts) (*ValidateResult, error) { return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*ValidateResult, error) { - return m.validate(ctx, tx) + return m.validate(ctx, tx, opts) }) } @@ -379,8 +387,8 @@ func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) { // been applied. // // This variant lets a caller validate within a transaction. -func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error) { - return m.validate(ctx, m.driver.UnwrapExecutor(tx)) +func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx, opts *ValidateOpts) (*ValidateResult, error) { + return m.validate(ctx, m.driver.UnwrapExecutor(tx), opts) } // migrateDown runs down migrations. @@ -472,7 +480,11 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor } // validate validates current migration state. -func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) (*ValidateResult, error) { +func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor, opts *ValidateOpts) (*ValidateResult, error) { + if opts == nil { + opts = &ValidateOpts{} + } + existingMigrations, err := m.existingMigrations(ctx, exec) if err != nil { return nil, err @@ -483,6 +495,18 @@ func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) delete(targetMigrations, migrateRow.Version) } + if opts.TargetVersion > 0 { + if _, ok := m.migrations[opts.TargetVersion]; !ok { + return nil, fmt.Errorf("version %d is not a valid River migration version", opts.TargetVersion) + } + + for version := range targetMigrations { + if version > opts.TargetVersion { + delete(targetMigrations, version) + } + } + } + notOKWithMessage := func(message string) *ValidateResult { m.Logger.InfoContext(ctx, m.Name+": "+message) return &ValidateResult{Messages: []string{message}} diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index ef44f29c..3ea3ffee 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -647,7 +647,37 @@ func TestMigrator(t *testing.T) { _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) + require.NoError(t, err) + require.Equal(t, &ValidateResult{OK: true}, res) + }) + + t.Run("ValidateSuccessWithTargetVersion", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + require.Equal(t, &ValidateResult{OK: true}, res) + }) + + t.Run("ValidateTxSuccessWithTargetVersion", func(t *testing.T) { + t.Parallel() + + const migrateVersionIncludingRiverJob = 2 + + migrator, bundle := setup(t) + + tx := testTx(t, bundle.driver) + + _, err := migrator.MigrateTx(ctx, tx, DirectionUp, &MigrateOpts{TargetVersion: migrateVersionIncludingRiverJob}) + require.NoError(t, err) + + res, err := migrator.ValidateTx(ctx, tx, &ValidateOpts{TargetVersion: migrateVersionIncludingRiverJob}) require.NoError(t, err) require.Equal(t, &ValidateResult{OK: true}, res) }) @@ -660,13 +690,37 @@ func TestMigrator(t *testing.T) { _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) require.NoError(t, err) - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) require.NoError(t, err) require.Equal(t, &ValidateResult{ Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion+1, migrationsBundle.MaxVersion+2)}, }, res) }) + t.Run("ValidateUnappliedMigrationsWithTargetVersion", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion - 1}) + require.NoError(t, err) + + res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) + require.NoError(t, err) + require.Equal(t, &ValidateResult{ + Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion, migrationsBundle.MaxVersion+1)}, + }, res) + }) + + t.Run("ValidateWithTargetVersionInvalid", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.WithTestVersionsMaxVersion + 77}) + require.EqualError(t, err, fmt.Sprintf("version %d is not a valid River migration version", migrationsBundle.WithTestVersionsMaxVersion+77)) + }) + t.Run("MigrateUpThenDownToZeroAndBackUp", func(t *testing.T) { t.Parallel()