Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Added

- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259)

## [0.38.0] - 2026-05-22

### Added
Expand Down
2 changes: 1 addition & 1 deletion cmd/river/rivercli/driver_procurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type MigratorInterface interface {
ExistingVersions(ctx context.Context) ([]rivermigrate.Migration, error)
GetVersion(version int) (rivermigrate.Migration, error)
Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
Validate(ctx context.Context) (*rivermigrate.ValidateResult, error)
Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error)
}

type pgxV5DriverProcurer struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/river/rivercli/river_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (c *validate) Run(ctx context.Context, opts *validateOpts) (bool, error) {
return false, err
}

res, err := migrator.Validate(ctx)
res, err := migrator.Validate(ctx, nil)
if err != nil {
return false, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/river/rivercli/river_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type MigratorStub struct {
existingVersionsStub func(ctx context.Context) ([]rivermigrate.Migration, error)
getVersionStub func(version int) (rivermigrate.Migration, error)
migrateStub func(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error)
validateStub func(ctx context.Context) (*rivermigrate.ValidateResult, error)
validateStub func(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error)
}

func (m *MigratorStub) AllVersions() []rivermigrate.Migration {
Expand Down Expand Up @@ -103,12 +103,12 @@ func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direc
return m.migrateStub(ctx, direction, opts)
}

func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) {
func (m *MigratorStub) Validate(ctx context.Context, opts *rivermigrate.ValidateOpts) (*rivermigrate.ValidateResult, error) {
if m.validateStub == nil {
panic("Validate is not stubbed")
}

return m.validateStub(ctx)
return m.validateStub(ctx, opts)
}

var (
Expand Down
34 changes: 29 additions & 5 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,20 @@ type ValidateResult struct {
OK bool
}

// ValidateOpts are options for a validate operation.
type ValidateOpts struct {
// TargetVersion is a specific migration version to validate up to. The
// version must exist. When set, validation only checks that migrations up
// to and including TargetVersion have been applied.
TargetVersion int
}

// Validate validates the current state of migrations, returning an unsuccessful
// validation and usable message in case there are migrations that haven't yet
// been applied.
func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) {
func (m *Migrator[TTx]) Validate(ctx context.Context, opts *ValidateOpts) (*ValidateResult, error) {
return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*ValidateResult, error) {
return m.validate(ctx, tx)
return m.validate(ctx, tx, opts)
})
}

Expand All @@ -379,8 +387,8 @@ func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) {
// been applied.
//
// This variant lets a caller validate within a transaction.
func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error) {
return m.validate(ctx, m.driver.UnwrapExecutor(tx))
func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx, opts *ValidateOpts) (*ValidateResult, error) {
return m.validate(ctx, m.driver.UnwrapExecutor(tx), opts)
}

// migrateDown runs down migrations.
Expand Down Expand Up @@ -472,7 +480,11 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor
}

// validate validates current migration state.
func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) (*ValidateResult, error) {
func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor, opts *ValidateOpts) (*ValidateResult, error) {
if opts == nil {
opts = &ValidateOpts{}
}

existingMigrations, err := m.existingMigrations(ctx, exec)
if err != nil {
return nil, err
Expand All @@ -483,6 +495,18 @@ func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor)
delete(targetMigrations, migrateRow.Version)
}

if opts.TargetVersion > 0 {
if _, ok := m.migrations[opts.TargetVersion]; !ok {
return nil, fmt.Errorf("version %d is not a valid River migration version", opts.TargetVersion)
}

for version := range targetMigrations {
if version > opts.TargetVersion {
delete(targetMigrations, version)
}
}
}

notOKWithMessage := func(message string) *ValidateResult {
m.Logger.InfoContext(ctx, m.Name+": "+message)
return &ValidateResult{Messages: []string{message}}
Expand Down
58 changes: 56 additions & 2 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,37 @@ func TestMigrator(t *testing.T) {
_, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

res, err := migrator.Validate(ctx)
res, err := migrator.Validate(ctx, nil)
require.NoError(t, err)
require.Equal(t, &ValidateResult{OK: true}, res)
})

t.Run("ValidateSuccessWithTargetVersion", func(t *testing.T) {
t.Parallel()

migrator, _ := setup(t)

_, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion})
require.NoError(t, err)

res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion})
require.NoError(t, err)
require.Equal(t, &ValidateResult{OK: true}, res)
})

t.Run("ValidateTxSuccessWithTargetVersion", func(t *testing.T) {
t.Parallel()

const migrateVersionIncludingRiverJob = 2

migrator, bundle := setup(t)

tx := testTx(t, bundle.driver)

_, err := migrator.MigrateTx(ctx, tx, DirectionUp, &MigrateOpts{TargetVersion: migrateVersionIncludingRiverJob})
require.NoError(t, err)

res, err := migrator.ValidateTx(ctx, tx, &ValidateOpts{TargetVersion: migrateVersionIncludingRiverJob})
require.NoError(t, err)
require.Equal(t, &ValidateResult{OK: true}, res)
})
Expand All @@ -660,13 +690,37 @@ func TestMigrator(t *testing.T) {
_, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{MaxSteps: migrationsBundle.MaxVersion})
require.NoError(t, err)

res, err := migrator.Validate(ctx)
res, err := migrator.Validate(ctx, nil)
require.NoError(t, err)
require.Equal(t, &ValidateResult{
Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion+1, migrationsBundle.MaxVersion+2)},
}, res)
})

t.Run("ValidateUnappliedMigrationsWithTargetVersion", func(t *testing.T) {
t.Parallel()

migrator, _ := setup(t)

_, err := migrator.Migrate(ctx, DirectionUp, &MigrateOpts{TargetVersion: migrationsBundle.MaxVersion - 1})
require.NoError(t, err)

res, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.MaxVersion + 1})
require.NoError(t, err)
require.Equal(t, &ValidateResult{
Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", migrationsBundle.MaxVersion, migrationsBundle.MaxVersion+1)},
}, res)
})

t.Run("ValidateWithTargetVersionInvalid", func(t *testing.T) {
t.Parallel()

migrator, _ := setup(t)

_, err := migrator.Validate(ctx, &ValidateOpts{TargetVersion: migrationsBundle.WithTestVersionsMaxVersion + 77})
require.EqualError(t, err, fmt.Sprintf("version %d is not a valid River migration version", migrationsBundle.WithTestVersionsMaxVersion+77))
})

t.Run("MigrateUpThenDownToZeroAndBackUp", func(t *testing.T) {
t.Parallel()

Expand Down
Loading