diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b319a40..a5f2cbdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 22598eb9..45e2ff2b 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -3,12 +3,15 @@ package riverpilot import ( "context" "sync/atomic" + "time" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" ) +const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second + type StandardPilot struct { seq atomic.Int64 } @@ -19,6 +22,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex if params.MaxToLock <= 0 { return nil, nil } + + ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) + defer cancel() + return exec.JobGetAvailable(ctx, params) } diff --git a/rivershared/riverpilot/standard_pilot_test.go b/rivershared/riverpilot/standard_pilot_test.go new file mode 100644 index 00000000..ae7d4254 --- /dev/null +++ b/rivershared/riverpilot/standard_pilot_test.go @@ -0,0 +1,69 @@ +package riverpilot + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" +) + +type standardPilotExecutorMock struct { + riverdriver.Executor + + jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) +} + +func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + return m.jobGetAvailableFunc(ctx, params) +} + +func TestStandardPilot_JobGetAvailable(t *testing.T) { + t.Parallel() + + type testBundle struct { + exec *standardPilotExecutorMock + pilot *StandardPilot + } + + setup := func(t *testing.T) *testBundle { + t.Helper() + + return &testBundle{ + exec: &standardPilotExecutorMock{}, + pilot: &StandardPilot{}, + } + } + + t.Run("ReturnsNilWhenMaxToLockIsZero", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + + res, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{}) + require.NoError(t, err) + require.Nil(t, res) + }) + + t.Run("PreservesParentCancellation", func(t *testing.T) { + t.Parallel() + + bundle := setup(t) + parentErr := errors.New("parent cancelled") + parentCtx, cancel := context.WithCancelCause(context.Background()) + cancel(parentErr) + + bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { + <-ctx.Done() + return nil, context.Cause(ctx) + } + + _, err := bundle.pilot.JobGetAvailable(parentCtx, bundle.exec, nil, &riverdriver.JobGetAvailableParams{ + MaxToLock: 1, + }) + require.ErrorIs(t, err, parentErr) + }) +}