Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add metadata assertions to rivertest. [PR #1137](https://github.com/riverqueue/river/pull/1137).

## [0.30.2] - 2026-01-26

### Fixed
Expand Down
106 changes: 106 additions & 0 deletions rivertest/rivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"slices"
"strings"
"testing"
Expand Down Expand Up @@ -50,6 +51,13 @@ type RequireInsertedOpts struct {
// No assertion is made if left the zero value.
MaxAttempts int

// Metadata is a subset of job metadata to assert against. Only the keys and
// values provided are compared, and any extra metadata on the job is
// ignored.
//
// No assertion is made if left nil or empty.
Metadata map[string]any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just related to our convo last weekend, wanted to quickly get your thoughts on allowing people to do something like specify an exact match versus partial match.

If this became:

	Metadata MetadataMatcher

And you had something like:

type MetadataMatcher interface {
	CheckMatch(t testingT, actualBytes []byte, requireNotInserted bool) (bool, []string)
}

type metadataContainsMatcher struct {
	expected map[string]any
}

// MetadataContains checks that the given key-value pairs are present in job
// metadata, ignoring any extra keys that may also be there.
func MetadataContains(expected map[string]any) metadataContainsMatcher {
	return metadataContainsMatcher{expected: expected}
}

type metadataExactlyMatcher struct {
	expected map[string]any
}

// MetadataExactly checks that the given key-value pairs are present in job
// metadata, and that no extra keys are present.
func MetadataExactly(expected map[string]any) metadataExactlyMatcher {
	return metadataExactlyMatcher{expected: expected}
}

You get a pretty clean check that's a bit more readable than what's in here right now like:

			opts.Metadata = MetadataContains(map[string]any{
				"key": "value",
			})

The downside is that sometimes these interface-based checks can be a little hard to figure out how to use.

Thoughts?


// Priority is the expected priority for the inserted job.
//
// No assertion is made if left the zero value.
Expand Down Expand Up @@ -501,6 +509,16 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
}
}

if len(expectedOpts.Metadata) > 0 {
metadataMatches, metadataFailures := compareMetadataSubset(t, jobRow.Metadata, expectedOpts.Metadata, requireNotInserted)

if !metadataMatches && requireNotInserted {
return true
}

failures = append(failures, metadataFailures...)
}

if expectedOpts.Priority != 0 {
if jobRow.Priority == expectedOpts.Priority {
if requireNotInserted {
Expand Down Expand Up @@ -594,6 +612,94 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
return false
}

func compareMetadataSubset(t testingT, jobMetadataBytes []byte, expectedMetadata map[string]any, requireNotInserted bool) (bool, []string) {
t.Helper()

jobMetadata := map[string]any{}
if len(jobMetadataBytes) > 0 {
if err := json.Unmarshal(jobMetadataBytes, &jobMetadata); err != nil {
failuref(t, "Internal failure: error unmarshaling job metadata: %s", err)
}
}

keys := make([]string, 0, len(expectedMetadata))
for key := range expectedMetadata {
keys = append(keys, key)
}
slices.Sort(keys)

failures := make([]string, 0, len(keys))
allMatch := true
for _, key := range keys {
expectedValue := expectedMetadata[key]

actualValue, ok := jobMetadata[key]
if !ok {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata missing key '%s'", key))
continue
}

if expectedValue == nil {
if actualValue == nil {
if requireNotInserted {
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded null", key))
}
} else {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected null", key, formatMetadataValue(actualValue)))
}
continue
}

normalizedExpected, err := normalizeMetadataValue(expectedValue)
if err != nil {
failuref(t, "Internal failure: error normalizing metadata for key '%s': %s", key, err)
}

if reflect.DeepEqual(actualValue, normalizedExpected) {
if requireNotInserted {
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded %s", key, formatMetadataValue(normalizedExpected)))
}
} else {
allMatch = false
if requireNotInserted {
return false, nil
}
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected %s", key, formatMetadataValue(actualValue), formatMetadataValue(normalizedExpected)))
}
}

return allMatch, failures
}

func formatMetadataValue(value any) string {
encoded, err := json.Marshal(value)
if err != nil {
return fmt.Sprintf("%v", value)
}
return string(encoded)
}

func normalizeMetadataValue(value any) (any, error) {
encoded, err := json.Marshal(value)
if err != nil {
return nil, err
}

var normalized any
if err := json.Unmarshal(encoded, &normalized); err != nil {
return nil, err
}
return normalized, nil
}

// failuref takes a printf-style directive and is a shortcut for failing an
// assertion.
func failuref(t testingT, format string, a ...any) {
Expand Down
65 changes: 65 additions & 0 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,41 @@ func TestRequireInsertedTx(t *testing.T) {
mockT.LogOutput())
})

t.Run("Metadata", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
Metadata: []byte(`{"key":"value","list":[1,2],"nested":{"enabled":true},"num":1}`),
})
require.NoError(t, err)

mockT := testutil.NewMockT(t)
opts := &RequireInsertedOpts{
Metadata: map[string]any{
"key": "value",
"nested": map[string]any{"enabled": true},
"num": int64(1),
},
}
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput())

mockT = testutil.NewMockT(t)
opts = &RequireInsertedOpts{
Metadata: map[string]any{
"key": "wrong",
"missing": "value",
},
}
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.True(t, mockT.Failed)
require.Equal(t,
failureString("Job with kind 'job2' metadata[key] \"value\" not equal to expected \"wrong\", metadata missing key 'missing'")+"\n",
mockT.LogOutput())
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -587,6 +622,36 @@ func TestRequireNotInsertedTx(t *testing.T) {
mockT.LogOutput())
})

t.Run("Metadata", func(t *testing.T) {
t.Parallel()

riverClient, bundle := setup(t)

_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
Metadata: []byte(`{"key":"value"}`),
})
require.NoError(t, err)

mockT := testutil.NewMockT(t)
opts := emptyOpts()
opts.Metadata = map[string]any{
"key": "value",
}
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.True(t, mockT.Failed)
require.Equal(t,
failureString("Job with kind 'job2' metadata[key] equal to excluded \"value\"")+"\n",
mockT.LogOutput())

mockT = testutil.NewMockT(t)
opts = emptyOpts()
opts.Metadata = map[string]any{
"key": "other",
}
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
require.False(t, mockT.Failed)
})

t.Run("Priority", func(t *testing.T) {
t.Parallel()

Expand Down
Loading