diff --git a/client_pilot_test.go b/client_pilot_test.go index 7f47cd4b..ef9d4bff 100644 --- a/client_pilot_test.go +++ b/client_pilot_test.go @@ -34,6 +34,8 @@ type pilotSpy struct { } type pilotSpyTestSignals struct { + JobBegin testsignal.TestSignal[int64] + JobEnd testsignal.TestSignal[int64] JobGetAvailable testsignal.TestSignal[struct{}] JobSetStateIfRunningMany testsignal.TestSignal[struct{}] PeriodicJobGetAll testsignal.TestSignal[struct{}] @@ -47,6 +49,8 @@ type pilotSpyTestSignals struct { } func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) { + ts.JobBegin.Init(tb) + ts.JobEnd.Init(tb) ts.JobGetAvailable.Init(tb) ts.JobSetStateIfRunningMany.Init(tb) ts.PeriodicJobGetAll.Init(tb) @@ -59,6 +63,11 @@ func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) { ts.QueueMetadataChanged.Init(tb) } +func (p *pilotSpy) JobBegin(ctx context.Context, job *rivertype.JobRow) { + p.testSignals.JobBegin.Signal(job.ID) + p.StandardPilot.JobBegin(ctx, job) +} + func (p *pilotSpy) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { p.jobCancelCalls.Add(1) return p.StandardPilot.JobCancel(ctx, exec, params) @@ -69,6 +78,11 @@ func (p *pilotSpy) JobCleanerQueuesExcluded() []string { return p.StandardPilot.JobCleanerQueuesExcluded() } +func (p *pilotSpy) JobEnd(ctx context.Context, job *rivertype.JobRow) { + p.testSignals.JobEnd.Signal(job.ID) + p.StandardPilot.JobEnd(ctx, job) +} + func (p *pilotSpy) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state riverpilot.ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { p.testSignals.JobGetAvailable.Signal(struct{}{}) return p.StandardPilot.JobGetAvailable(ctx, exec, state, params) @@ -324,6 +338,8 @@ func Test_Client_PilotUsage(t *testing.T) { riversharedtest.WaitOrTimeout(t, jobDone) require.NotZero(t, insertRes.Job.ID) + require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobBegin.WaitOrTimeout()) + require.Equal(t, insertRes.Job.ID, pilot.testSignals.JobEnd.WaitOrTimeout()) pilot.testSignals.JobGetAvailable.WaitOrTimeout() pilot.testSignals.JobSetStateIfRunningMany.WaitOrTimeout() pilot.testSignals.ProducerInit.WaitOrTimeout() diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index dab44f39..cfe3b0ab 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -21,6 +21,7 @@ import ( "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivertype" ) @@ -114,6 +115,7 @@ type JobExecutor struct { HookLookupGlobal hooklookup.HookLookupInterface JobRow *rivertype.JobRow MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface + Pilot riverpilot.Pilot ProducerCallbacks struct { JobDone func(jobRow *rivertype.JobRow) Stuck func() @@ -143,7 +145,16 @@ func (e *JobExecutor) Execute(ctx context.Context) { QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), } + if e.Pilot != nil { + e.Pilot.JobBegin(ctx, e.JobRow) + } + res := e.execute(ctx) + + if e.Pilot != nil { + e.Pilot.JobEnd(ctx, e.JobRow) + } + if res.Err != nil && errors.Is(context.Cause(ctx), rivertype.ErrJobCancelledRemotely) { res.Err = context.Cause(ctx) } diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 1674fa7e..439dbc73 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -112,6 +112,25 @@ func (h *testErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRo return h.HandlePanicFunc(ctx, job, panicVal, trace) } +type testPilotWithJobCallbacks struct { + riverpilot.StandardPilot + + JobBeginFunc func(ctx context.Context, job *rivertype.JobRow) + JobEndFunc func(ctx context.Context, job *rivertype.JobRow) +} + +func (p *testPilotWithJobCallbacks) JobBegin(ctx context.Context, job *rivertype.JobRow) { + if p.JobBeginFunc != nil { + p.JobBeginFunc(ctx, job) + } +} + +func (p *testPilotWithJobCallbacks) JobEnd(ctx context.Context, job *rivertype.JobRow) { + if p.JobEndFunc != nil { + p.JobEndFunc(ctx, job) + } +} + func TestJobExecutor_Execute(t *testing.T) { t.Parallel() @@ -995,6 +1014,90 @@ func TestJobExecutor_Execute(t *testing.T) { require.True(t, bundle.errorHandler.HandlePanicCalled) }) + t.Run("PilotJobCallbacksJobBeginAndEnd", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var events []string + + executor.Pilot = &testPilotWithJobCallbacks{ + JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "begin") + }, + JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "end") + }, + } + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { + events = append(events, "work") + return nil + }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + riversharedtest.WaitOrTimeout(t, bundle.updateCh) + + require.Equal(t, []string{"begin", "work", "end"}, events) + }) + + t.Run("PilotJobCallbacksJobEndInvokedOnError", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var events []string + + executor.Pilot = &testPilotWithJobCallbacks{ + JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "begin") + }, + JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "end") + }, + } + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { + events = append(events, "work") + return errors.New("work failed") + }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + riversharedtest.WaitOrTimeout(t, bundle.updateCh) + + require.Equal(t, []string{"begin", "work", "end"}, events) + }) + + t.Run("PilotJobCallbacksJobEndInvokedOnPanic", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + var events []string + + executor.Pilot = &testPilotWithJobCallbacks{ + JobBeginFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "begin") + }, + JobEndFunc: func(ctx context.Context, job *rivertype.JobRow) { + require.Equal(t, bundle.jobRow.ID, job.ID) + events = append(events, "end") + }, + } + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { + events = append(events, "work") + panic("panic val") + }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(ctx) + riversharedtest.WaitOrTimeout(t, bundle.updateCh) + + require.Equal(t, []string{"begin", "work", "end"}, events) + }) + t.Run("CancelFuncCleanedUpEvenWithoutCancel", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..2ef49424 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -140,6 +140,7 @@ func TestJobRescuer(t *testing.T) { stuckToRetryJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) stuckToRetryJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + stuckToRetryJobWithLastSeen := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), Metadata: fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt, bundle.rescueHorizon.Add(-1*time.Minute).UTC().Format(time.RFC3339Nano)), MaxAttempts: ptrutil.Ptr(5)}) stuckToRetryJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued // Already at max attempts: @@ -183,6 +184,7 @@ func TestJobRescuer(t *testing.T) { var err error confirmRetried(stuckToRetryJob1) confirmRetried(stuckToRetryJob2) + confirmRetried(stuckToRetryJobWithLastSeen) job3After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: stuckToRetryJob3.ID, Schema: rescuer.Config.Schema}) require.NoError(t, err) diff --git a/producer.go b/producer.go index c753814c..a309f727 100644 --- a/producer.go +++ b/producer.go @@ -831,6 +831,7 @@ func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype. HookLookupGlobal: p.config.HookLookupGlobal, MiddlewareLookupGlobal: p.config.MiddlewareLookupGlobal, JobRow: job, + Pilot: p.pilot, ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) Stuck func() diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index ac01941a..bbeb44ef 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -609,7 +609,13 @@ const jobGetStuck = `-- name: JobGetStuck :many SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE state = 'running' - AND attempted_at < $1::timestamptz + -- ` + "`" + `last_seen_at` + "`" + ` may still be present on a row from its last retry, so make + -- sure we have ` + "`" + `max` + "`" + ` to take ` + "`" + `attempted_at` + "`" + ` (set on the latest lock of the + -- job) if it's larger. + AND greatest( + attempted_at, + (metadata->>'river:last_seen_at')::timestamptz + ) < $1::timestamptz ORDER BY id LIMIT $2 ` diff --git a/riverdriver/riverdrivertest/job_read.go b/riverdriver/riverdrivertest/job_read.go index 5649d910..3a3e3c19 100644 --- a/riverdriver/riverdrivertest/job_read.go +++ b/riverdriver/riverdrivertest/job_read.go @@ -2,6 +2,7 @@ package riverdrivertest import ( "context" + "fmt" "sort" "strconv" "testing" @@ -12,6 +13,7 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" + riversharedmaintenance "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -519,8 +521,14 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx afterHorizon = horizon.Add(1 * time.Minute) ) - stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + metadataWithLastSeen := func(t time.Time) []byte { + return fmt.Appendf(nil, `{"%s": %q}`, riversharedmaintenance.MetadataKeyLastSeenAt, + t.UTC().Round(time.Millisecond).Format("2006-01-02 15:04:05.999-07:00")) + } + + stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: []byte(`{"other":"value"}`), State: ptrutil.Ptr(rivertype.JobStateRunning)}) stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) t.Logf("horizon = %s", horizon) t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) @@ -528,7 +536,7 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) - // Not returned because we put a maximum of two. + // Not returned because we put a maximum of three. _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) // Not stuck because not in running state. @@ -537,13 +545,19 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx // Not stuck because after queried horizon. _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Max two stuck + // Not stuck because last seen is after queried horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, Metadata: metadataWithLastSeen(afterHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + // Not stuck because attempted at is after queried horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, Metadata: metadataWithLastSeen(beforeHorizon), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + // Max three stuck. stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: 2, + Max: 3, StuckHorizon: horizon, }) require.NoError(t, err) - require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID, stuckJob3.ID}, sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index dde85de4..154acf60 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -258,7 +258,13 @@ ORDER BY id; SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' - AND attempted_at < @stuck_horizon::timestamptz + -- `last_seen_at` may still be present on a row from its last retry, so make + -- sure we have `max` to take `attempted_at` (set on the latest lock of the + -- job) if it's larger. + AND greatest( + attempted_at, + (metadata->>'river:last_seen_at')::timestamptz + ) < @stuck_horizon::timestamptz ORDER BY id LIMIT @max; @@ -726,4 +732,4 @@ SET metadata = CASE WHEN @metadata_do_update::boolean THEN @metadata::jsonb ELSE metadata END, state = CASE WHEN @state_do_update::boolean THEN @state::/* TEMPLATE: schema */river_job_state ELSE state END WHERE id = @id -RETURNING *; \ No newline at end of file +RETURNING *; diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 0be29561..cfc32ac6 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -591,7 +591,13 @@ const jobGetStuck = `-- name: JobGetStuck :many SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE state = 'running' - AND attempted_at < $1::timestamptz + -- ` + "`" + `last_seen_at` + "`" + ` may still be present on a row from its last retry, so make + -- sure we have ` + "`" + `max` + "`" + ` to take ` + "`" + `attempted_at` + "`" + ` (set on the latest lock of the + -- job) if it's larger. + AND greatest( + attempted_at, + (metadata->>'river:last_seen_at')::timestamptz + ) < $1::timestamptz ORDER BY id LIMIT $2 ` diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..bebf7746 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -185,7 +185,15 @@ ORDER BY id; SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' - AND attempted_at < cast(@stuck_horizon AS text) + -- `last_seen_at` may still be present on a row from its last retry, so make + -- sure we have `max` to take `attempted_at` (set on the latest lock of the + -- job) if it's larger. + -- + -- `coalesce` is necessary because `max(NULL, ...)` always returns `NULL`. + AND max( + attempted_at, + coalesce(json_extract(metadata, '$."river:last_seen_at"'), attempted_at) + ) < cast(@stuck_horizon AS text) ORDER BY id LIMIT @max; @@ -513,4 +521,4 @@ SET metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END, state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END WHERE id = @id -RETURNING *; \ No newline at end of file +RETURNING *; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 1963523d..358aea63 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -561,7 +561,15 @@ const jobGetStuck = `-- name: JobGetStuck :many SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE state = 'running' - AND attempted_at < cast(?1 AS text) + -- ` + "`" + `last_seen_at` + "`" + ` may still be present on a row from its last retry, so make + -- sure we have ` + "`" + `max` + "`" + ` to take ` + "`" + `attempted_at` + "`" + ` (set on the latest lock of the + -- job) if it's larger. + -- + -- ` + "`" + `coalesce` + "`" + ` is necessary because ` + "`" + `max(NULL, ...)` + "`" + ` always returns ` + "`" + `NULL` + "`" + `. + AND max( + attempted_at, + coalesce(json_extract(metadata, '$."river:last_seen_at"'), attempted_at) + ) < cast(?1 AS text) ORDER BY id LIMIT ?2 ` diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index e42ef797..f7c55e24 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -19,6 +19,9 @@ import ( type Pilot interface { PilotPeriodicJob + // JobBegin is invoked when a job begins work. + JobBegin(ctx context.Context, job *rivertype.JobRow) + JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) // JobCleanerQueuesExcluded returns queues that should be excluded from the @@ -28,6 +31,10 @@ type Pilot interface { // conceivably be changed.) JobCleanerQueuesExcluded() []string + // JobEnd is invoked when a job ends work. It's still invoked even in the + // event of error or panic. + JobEnd(ctx context.Context, job *rivertype.JobRow) + JobGetAvailable( ctx context.Context, exec riverdriver.Executor, diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 22598eb9..c7dfa6ec 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -13,8 +13,20 @@ type StandardPilot struct { seq atomic.Int64 } +func (p *StandardPilot) JobBegin(ctx context.Context, job *rivertype.JobRow) { + // No-op +} + +func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { + return exec.JobCancel(ctx, params) +} + func (p *StandardPilot) JobCleanerQueuesExcluded() []string { return nil } +func (p *StandardPilot) JobEnd(ctx context.Context, job *rivertype.JobRow) { + // No-op +} + func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { if params.MaxToLock <= 0 { return nil, nil @@ -22,10 +34,6 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return exec.JobGetAvailable(ctx, params) } -func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { - return exec.JobCancel(ctx, params) -} - func (p *StandardPilot) JobInsertMany( ctx context.Context, exec riverdriver.Executor, diff --git a/rivershared/riversharedmaintenance/river_shared_maintenance.go b/rivershared/riversharedmaintenance/river_shared_maintenance.go index 82487300..151c9d65 100644 --- a/rivershared/riversharedmaintenance/river_shared_maintenance.go +++ b/rivershared/riversharedmaintenance/river_shared_maintenance.go @@ -65,6 +65,17 @@ const ( BatchSizeReduced = 1_000 ) +const ( + // MetadataKeyLastSeenAt records the last time a running job was seen by a + // worker. It lives in this package so River Pro can reuse the same metadata + // key without depending on an internal package. + // + // The SQL operations use a hard-coded `river:last_seen_at` because Postgres + // doesn't easily parameterize in the places we tend to want to use it, and + // sqlctemplate's support for replacing strings isn't very pretty. + MetadataKeyLastSeenAt = "river:last_seen_at" +) + // BatchSizes containing batch size information for maintenance services. It's // mean to be embedded on each service's configuration struct so as to provide a // common way of organizing and initializing batch sizes for improved diff --git a/rivertest/worker.go b/rivertest/worker.go index 45c5fb8b..bcc856c5 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -208,6 +208,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job HookLookupByJob: hooklookup.NewJobHookLookup(), JobRow: job, MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(append(rivermiddleware.DefaultMiddleware(), w.config.Middleware...)), + Pilot: w.client.Pilot(), ProducerCallbacks: struct { JobDone func(jobRow *rivertype.JobRow) Stuck func()