Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255)

## [0.38.0] - 2026-05-22

### Added
Expand Down
7 changes: 7 additions & 0 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package riverpilot
import (
"context"
"sync/atomic"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)

const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second

type StandardPilot struct {
seq atomic.Int64
}
Expand All @@ -19,6 +22,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
if params.MaxToLock <= 0 {
return nil, nil
}

ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded)
defer cancel()

return exec.JobGetAvailable(ctx, params)
}

Expand Down
69 changes: 69 additions & 0 deletions rivershared/riverpilot/standard_pilot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package riverpilot

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)

type standardPilotExecutorMock struct {
riverdriver.Executor

jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
}

func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
return m.jobGetAvailableFunc(ctx, params)
}

func TestStandardPilot_JobGetAvailable(t *testing.T) {
t.Parallel()

type testBundle struct {
exec *standardPilotExecutorMock
pilot *StandardPilot
}

setup := func(t *testing.T) *testBundle {
t.Helper()

return &testBundle{
exec: &standardPilotExecutorMock{},
pilot: &StandardPilot{},
}
}

t.Run("ReturnsNilWhenMaxToLockIsZero", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

res, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{})
require.NoError(t, err)
require.Nil(t, res)
})

t.Run("PreservesParentCancellation", func(t *testing.T) {
t.Parallel()

bundle := setup(t)
parentErr := errors.New("parent cancelled")
parentCtx, cancel := context.WithCancelCause(context.Background())
cancel(parentErr)

bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
<-ctx.Done()
return nil, context.Cause(ctx)
}

_, err := bundle.pilot.JobGetAvailable(parentCtx, bundle.exec, nil, &riverdriver.JobGetAvailableParams{
MaxToLock: 1,
})
require.ErrorIs(t, err, parentErr)
})
}