diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b319a40..01e43314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) + +### Added + +- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/cmd/river/rivercli/driver_procurer.go b/cmd/river/rivercli/driver_procurer.go index 40fc6ec9..29e4edce 100644 --- a/cmd/river/rivercli/driver_procurer.go +++ b/cmd/river/rivercli/driver_procurer.go @@ -42,7 +42,7 @@ type MigratorInterface interface { ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error) GetVersion(version int) (rivermigrate.Migration, error) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) - Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) + Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) } type pgxV5DriverProcurer struct { diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index 0b0dedc4..fc1e55c8 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -708,7 +708,7 @@ func (c *validate) Run(ctx context.Context, opts *validateOpts) (bool, error) { return false, err } - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) if err != nil { return false, err } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 65e1134d..95ee4fe5 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -68,7 +68,7 @@ type MigratorStub struct { existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error) getVersionStub func(version int) (rivermigrate.Migration, error) migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) - validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error) + validateStub func(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) } func (m *MigratorStub) AllVersions() []rivermigrate.Migration { @@ -103,12 +103,12 @@ func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direc return m.migrateStub(ctx, direction, opts) } -func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { +func (m *MigratorStub) Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) { if m.validateStub == nil { panic("Validate is not stubbed") } - return m.validateStub(ctx) + return m.validateStub(ctx, opts) } var ( diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 12568e20..9b4db9a8 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -365,12 +365,20 @@ type ValidateResult struct { OK bool } +// ValidateOpts are options for a validate operation. +type ValidateOpts struct { + // TargetVersion is a specific migration version to validate up to. The + // version must exist. When set, validation only checks that migrations up + // to and including TargetVersion have been applied. + TargetVersion int +} + // Validate validates the current state of migrations, returning an unsuccessful // validation and usable message in case there are migrations that haven't yet // been applied. -func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) { +func (m *Migrator[TTx]) Validate(ctx context.Context, opts *ValidateOpts) (*ValidateResult, error) { return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*ValidateResult, error) { - return m.validate(ctx, tx) + return m.validate(ctx, tx, opts) }) } @@ -379,8 +387,8 @@ func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) { // been applied. // // This variant lets a caller validate within a transaction. -func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error) { - return m.validate(ctx, m.driver.UnwrapExecutor(tx)) +func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx, opts *ValidateOpts) (*ValidateResult, error) { + return m.validate(ctx, m.driver.UnwrapExecutor(tx), opts) } // migrateDown runs down migrations. @@ -472,7 +480,11 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor } // validate validates current migration state. -func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) (*ValidateResult, error) { +func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor, opts *ValidateOpts) (*ValidateResult, error) { + if opts == nil { + opts = &ValidateOpts{} + } + existingMigrations, err := m.existingMigrations(ctx, exec) if err != nil { return nil, err @@ -483,6 +495,18 @@ func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) delete(targetMigrations, migrateRow.Version) } + if opts.TargetVersion > 0 { + if _, ok := m.migrations[opts.TargetVersion]; !ok { + return nil, fmt.Errorf("version %d is not a valid River migration version", opts.TargetVersion) + } + + for version := range targetMigrations { + if version > opts.TargetVersion { + delete(targetMigrations, version) + } + } + } + notOKWithMessage := func(message string) *ValidateResult { m.Logger.InfoContext(ctx, m.Name+": "+message) return &ValidateResult{Messages: []string{message}} diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index ef44f29c..3ea3ffee 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -647,7 +647,37 @@ func TestMigrator(t *testing.T) { _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) + require.NoError(t, err) + require.Equal(t, &ValidateResult{OK: true}, res) + }) + + t.Run("ValidateSuccessWithTargetVersion", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + + res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion}) + require.NoError(t, err) + require.Equal(t, &ValidateResult{OK: true}, res) + }) + + t.Run("ValidateTxSuccessWithTargetVersion", func(t *testing.T) { + t.Parallel() + + const migrateVersionIncludingRiverJob = 2 + + migrator, bundle := setup(t) + + tx := testTx(t, bundle.driver) + + _, err := migrator.MigrateTx(ctx, tx, DirectionUp, &MigrateOpts{TargetVersion: migrateVersionIncludingRiverJob}) + require.NoError(t, err) + + res, err := migrator.ValidateTx(ctx, tx, &ValidateOpts{TargetVersion: migrateVersionIncludingRiverJob}) require.NoError(t, err) require.Equal(t, &ValidateResult{OK: true}, res) }) @@ -660,13 +690,37 @@ func TestMigrator(t *testing.T) { _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion}) require.NoError(t, err) - res, err := migrator.Validate(ctx) + res, err := migrator.Validate(ctx, nil) require.NoError(t, err) require.Equal(t, &ValidateResult{ Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion+1, migrationsBundle.MaxVersion+2)}, }, res) }) + t.Run("ValidateUnappliedMigrationsWithTargetVersion", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion - 1}) + require.NoError(t, err) + + res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion + 1}) + require.NoError(t, err) + require.Equal(t, &ValidateResult{ + Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion, migrationsBundle.MaxVersion+1)}, + }, res) + }) + + t.Run("ValidateWithTargetVersionInvalid", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + _, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.WithTestVersionsMaxVersion + 77}) + require.EqualError(t, err, fmt.Sprintf("version %d is not a valid River migration version", migrationsBundle.WithTestVersionsMaxVersion+77)) + }) + t.Run("MigrateUpThenDownToZeroAndBackUp", func(t *testing.T) { t.Parallel()