Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.24.1
require (
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.2
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf
github.com/riverqueue/river v0.18.0
github.com/riverqueue/river/riverdriver v0.18.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.18.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f h1:Pr3+ERes1GkCJnw/gpi9yVx9tN7VCcPjuZCFN/OBaZg=
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/apiframe v0.0.0-20250310054829-19e112904670 h1:c2t1Rgb/VC07gUXEO4OnPBKRd6Oct5yECyhxtVyIO1M=
github.com/riverqueue/apiframe v0.0.0-20250310054829-19e112904670/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/apiframe v0.0.0-20250310055546-28a3f81b743f h1:LzrDfxieq6nT71TovkyLSTYhUzTbRoV0U2llKxFUtnA=
github.com/riverqueue/apiframe v0.0.0-20250310055546-28a3f81b743f/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf h1:y0ZXBnVCUuqKNhld/VVIix5pYVPjzOZu3J48wDpatSU=
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/river v0.18.0 h1:sGHeTOL9MR8+pMIVHRm59fzet8Ron/xjF3Yq/PSGb78=
github.com/riverqueue/river v0.18.0/go.mod h1:oapX5xb/L2YnkE801QubDZ0COHxVxEGVY37icPzghhU=
github.com/riverqueue/river/riverdriver v0.18.0 h1:a2haR5I0MQLHjLCSVFpUEeJALCLemRl5zCztucysm1E=
Expand Down
67 changes: 34 additions & 33 deletions handler_api_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/apiframe/apierror"
"github.com/riverqueue/apiframe/apitest"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/riversharedtest"
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint)

resp, err := endpoint.Execute(ctx, &healthCheckGetRequest{Name: healthCheckNameComplete})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameComplete})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)
})
Expand All @@ -81,7 +82,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) {
// Roll back prematurely so we get a database error.
require.NoError(t, bundle.tx.Rollback(ctx))

_, err := endpoint.Execute(ctx, &healthCheckGetRequest{Name: healthCheckNameComplete})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameComplete})
requireAPIError(t, apierror.WithInternalError(
apierror.NewServiceUnavailable("Unable to query database. Check logs for details."),
pgx.ErrTxClosed,
Expand All @@ -93,7 +94,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint)

resp, err := endpoint.Execute(ctx, &healthCheckGetRequest{Name: healthCheckNameMinimal})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: healthCheckNameMinimal})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)
})
Expand All @@ -103,7 +104,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint)

_, err := endpoint.Execute(ctx, &healthCheckGetRequest{Name: "other"})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &healthCheckGetRequest{Name: "other"})
requireAPIError(t, apierror.NewNotFoundf("Health check %q not found. Use either `complete` or `minimal`.", "other"), err)
})
}
Expand All @@ -121,7 +122,7 @@ func TestJobCancelEndpoint(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

resp, err := endpoint.Execute(ctx, &jobCancelRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobCancelRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)

Expand All @@ -139,7 +140,7 @@ func TestJobCancelEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newJobCancelEndpoint)

_, err := endpoint.Execute(ctx, &jobCancelRequest{JobIDs: []int64String{123}})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobCancelRequest{JobIDs: []int64String{123}})
requireAPIError(t, NewNotFoundJob(123), err)
})
}
Expand All @@ -157,7 +158,7 @@ func TestJobDeleteEndpoint(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

resp, err := endpoint.Execute(ctx, &jobDeleteRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobDeleteRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)

Expand All @@ -173,7 +174,7 @@ func TestJobDeleteEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newJobDeleteEndpoint)

_, err := endpoint.Execute(ctx, &jobDeleteRequest{JobIDs: []int64String{123}})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobDeleteRequest{JobIDs: []int64String{123}})
requireAPIError(t, NewNotFoundJob(123), err)
})
}
Expand All @@ -190,7 +191,7 @@ func TestJobGetEndpoint(t *testing.T) {

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

resp, err := endpoint.Execute(ctx, &jobGetRequest{JobID: job.ID})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobGetRequest{JobID: job.ID})
require.NoError(t, err)
require.Equal(t, job.ID, resp.ID)
})
Expand All @@ -200,7 +201,7 @@ func TestJobGetEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newJobGetEndpoint)

_, err := endpoint.Execute(ctx, &jobGetRequest{JobID: 123})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobGetRequest{JobID: 123})
requireAPIError(t, NewNotFoundJob(123), err)
})
}
Expand All @@ -226,7 +227,7 @@ func TestAPIHandlerJobList(t *testing.T) {
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStatePending)})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})

resp, err := endpoint.Execute(ctx, &jobListRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, job1.ID, resp.Data[0].ID)
Expand All @@ -241,7 +242,7 @@ func TestAPIHandlerJobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

resp, err := endpoint.Execute(ctx, &jobListRequest{Limit: ptrutil.Ptr(1)})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{Limit: ptrutil.Ptr(1)})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, job1.ID, resp.Data[0].ID)
Expand All @@ -258,7 +259,7 @@ func TestAPIHandlerJobList(t *testing.T) {
// Other states excluded.
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)})

resp, err := endpoint.Execute(ctx, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateCompleted)})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateCompleted)})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, job2.ID, resp.Data[0].ID) // order inverted
Expand All @@ -276,7 +277,7 @@ func TestAPIHandlerJobList(t *testing.T) {
// Other states excluded.
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})

resp, err := endpoint.Execute(ctx, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateAvailable)})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobListRequest{State: ptrutil.Ptr(rivertype.JobStateAvailable)})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, job1.ID, resp.Data[0].ID)
Expand All @@ -303,7 +304,7 @@ func TestJobRetryEndpoint(t *testing.T) {
State: ptrutil.Ptr(rivertype.JobStateDiscarded),
})

resp, err := endpoint.Execute(ctx, &jobRetryRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobRetryRequest{JobIDs: []int64String{int64String(job1.ID), int64String(job2.ID)}})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)

Expand All @@ -321,7 +322,7 @@ func TestJobRetryEndpoint(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newJobRetryEndpoint)

_, err := endpoint.Execute(ctx, &jobRetryRequest{JobIDs: []int64String{123}})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &jobRetryRequest{JobIDs: []int64String{123}})
requireAPIError(t, NewNotFoundJob(123), err)
})
}
Expand All @@ -341,7 +342,7 @@ func TestAPIHandlerQueueGet(t *testing.T) {
_, err := bundle.client.InsertTx(ctx, bundle.tx, &noOpArgs{}, &river.InsertOpts{Queue: queue.Name})
require.NoError(t, err)

resp, err := endpoint.Execute(ctx, &queueGetRequest{Name: queue.Name})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueGetRequest{Name: queue.Name})
require.NoError(t, err)
require.Equal(t, 1, resp.CountAvailable)
require.Equal(t, queue.Name, resp.Name)
Expand All @@ -352,7 +353,7 @@ func TestAPIHandlerQueueGet(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newQueueGetEndpoint)

_, err := endpoint.Execute(ctx, &queueGetRequest{Name: "does_not_exist"})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueGetRequest{Name: "does_not_exist"})
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}
Expand All @@ -373,7 +374,7 @@ func TestAPIHandlerQueueList(t *testing.T) {
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &queue1.Name})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &queue2.Name})

resp, err := endpoint.Execute(ctx, &queueListRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueListRequest{})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, 1, resp.Data[0].CountAvailable)
Expand All @@ -390,7 +391,7 @@ func TestAPIHandlerQueueList(t *testing.T) {
queue1 := testfactory.Queue(ctx, t, bundle.exec, nil)
_ = testfactory.Queue(ctx, t, bundle.exec, nil)

resp, err := endpoint.Execute(ctx, &queueListRequest{Limit: ptrutil.Ptr(1)})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueListRequest{Limit: ptrutil.Ptr(1)})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, queue1.Name, resp.Data[0].Name)
Expand All @@ -409,7 +410,7 @@ func TestAPIHandlerQueuePause(t *testing.T) {

queue := testfactory.Queue(ctx, t, bundle.exec, nil)

resp, err := endpoint.Execute(ctx, &queuePauseRequest{Name: queue.Name})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queuePauseRequest{Name: queue.Name})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)
})
Expand All @@ -419,7 +420,7 @@ func TestAPIHandlerQueuePause(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newQueuePauseEndpoint)

_, err := endpoint.Execute(ctx, &queuePauseRequest{Name: "does_not_exist"})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queuePauseRequest{Name: "does_not_exist"})
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}
Expand All @@ -438,7 +439,7 @@ func TestAPIHandlerQueueResume(t *testing.T) {
PausedAt: ptrutil.Ptr(time.Now()),
})

resp, err := endpoint.Execute(ctx, &queueResumeRequest{Name: queue.Name})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueResumeRequest{Name: queue.Name})
require.NoError(t, err)
require.Equal(t, statusResponseOK, resp)
})
Expand All @@ -448,7 +449,7 @@ func TestAPIHandlerQueueResume(t *testing.T) {

endpoint, _ := setupEndpoint(ctx, t, newQueueResumeEndpoint)

_, err := endpoint.Execute(ctx, &queueResumeRequest{Name: "does_not_exist"})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &queueResumeRequest{Name: "does_not_exist"})
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}
Expand Down Expand Up @@ -493,7 +494,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) {
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)})
}

resp, err := endpoint.Execute(ctx, &stateAndCountGetRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{})
require.NoError(t, err)
require.Equal(t, &stateAndCountGetResponse{
Available: 1,
Expand All @@ -520,7 +521,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) {
_, err := endpoint.queryCacher.RunQuery(ctx)
require.NoError(t, err)

resp, err := endpoint.Execute(ctx, &stateAndCountGetRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{})
require.NoError(t, err)
require.Equal(t, &stateAndCountGetResponse{
Available: queryCacheSkipThreshold + 1,
Expand All @@ -540,7 +541,7 @@ func TestStateAndCountGetEndpoint(t *testing.T) {
_, err := endpoint.queryCacher.RunQuery(ctx)
require.NoError(t, err)

resp, err := endpoint.Execute(ctx, &stateAndCountGetRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &stateAndCountGetRequest{})
require.NoError(t, err)
require.Equal(t, &stateAndCountGetResponse{
Available: queryCacheSkipThreshold - 1,
Expand All @@ -562,7 +563,7 @@ func TestAPIHandlerWorkflowGet(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: mustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID})})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: mustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID})})

resp, err := endpoint.Execute(ctx, &workflowGetRequest{ID: workflowID.String()})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowGetRequest{ID: workflowID.String()})
require.NoError(t, err)
require.Len(t, resp.Tasks, 2)
require.Equal(t, job1.ID, resp.Tasks[0].ID)
Expand All @@ -576,7 +577,7 @@ func TestAPIHandlerWorkflowGet(t *testing.T) {

workflowID := uuid.New()

_, err := endpoint.Execute(ctx, &workflowGetRequest{ID: workflowID.String()})
_, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowGetRequest{ID: workflowID.String()})
requireAPIError(t, NewNotFoundWorkflow(workflowID.String()), err)
})
}
Expand Down Expand Up @@ -607,7 +608,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) {
})

t.Run("All", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{})
require.NoError(t, err)
require.Len(t, resp.Data, 2)
require.Equal(t, 1, resp.Data[0].CountCancelled)
Expand All @@ -629,7 +630,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) {
})

t.Run("Active", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{State: "active"})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{State: "active"})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, 0, resp.Data[0].CountAvailable)
Expand All @@ -646,7 +647,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) {
})

t.Run("Inactive", func(t *testing.T) {
resp, err := endpoint.Execute(ctx, &workflowListRequest{State: "inactive"})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{State: "inactive"})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, 1, resp.Data[0].CountCompleted)
Expand All @@ -671,7 +672,7 @@ func TestAPIHandlerWorkflowList(t *testing.T) {
State: ptrutil.Ptr(rivertype.JobStateScheduled),
})

resp, err := endpoint.Execute(ctx, &workflowListRequest{Limit: ptrutil.Ptr(1)})
resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, &workflowListRequest{Limit: ptrutil.Ptr(1)})
require.NoError(t, err)
require.Len(t, resp.Data, 1)
require.Equal(t, "2", resp.Data[0].ID) // DESC order means last one gets returned
Expand Down
Loading