Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
name: Go lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v1.63.4
GOLANGCI_LINT_VERSION: v1.64.6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were on 1.64.4 elsewhere so we probably need to bump everywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily I don't think the new versions make that big of a difference (at least between 1.63 and 1.64). The main thing for some of them recently was getting in Go 1.24 support. But yeah, we should start rolling all the versions forward as we get to them.

permissions:
contents: read
# allow read access to pull request. Use with `only-new-issues` option.
Expand Down
2 changes: 1 addition & 1 deletion cmd/riverui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"github.com/rs/cors"
sloghttp "github.com/samber/slog-http"

"github.com/riverqueue/apiframe/apimiddleware"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"

"riverqueue.com/riverui"
"riverqueue.com/riverui/internal/apimiddleware"
)

func main() {
Expand Down
Empty file removed dist/.gitkeep
Empty file.
12 changes: 4 additions & 8 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,19 @@ River UI consists of two apps: a Go backend API, and a TypeScript UI frontend.

## Environment

The project uses a combination of direnv and a `.env` file (to suit Vite conventions). Copy the example and edit as necessary:

```sh
cp .env.example .env.local
```

## Install dependencies

```sh
go get ./...
npm install
```

## Install Reflex

This project uses [Reflex](https://github.com/cespare/reflex) for local dev. Install it.

``` sh
go install github.com/cespare/reflex@latest
```

## Running the UI and API together

```sh
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
module riverqueue.com/riverui

go 1.22.0
go 1.23.0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I tried not to change this, but the new repo really wanted to be Go 1.23 (and run of go mod tidy would change it to such), and when I brought that in here, of course it'd take this project to 1.23 as well.

Probably okay though since we don't really need to make as much an effort to support very old Go versions for this project, and currently only 1.23 and 1.24 are supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it's fine, latest 2 Go versions is a reasonable and common stance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, +1. SG.


toolchain go1.23.5
toolchain go1.24.1

require (
github.com/go-playground/validator/v10 v10.25.0
github.com/google/uuid v1.6.0
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pgx/v5 v5.7.2
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f
github.com/riverqueue/river v0.18.0
github.com/riverqueue/river/riverdriver v0.18.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.18.0
Expand All @@ -24,6 +23,8 @@ require (
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.25.0 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f h1:Pr3+ERes1GkCJnw/gpi9yVx9tN7VCcPjuZCFN/OBaZg=
github.com/riverqueue/apiframe v0.0.0-20250310051455-203f3fd8260f/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/river v0.18.0 h1:sGHeTOL9MR8+pMIVHRm59fzet8Ron/xjF3Yq/PSGb78=
github.com/riverqueue/river v0.18.0/go.mod h1:oapX5xb/L2YnkE801QubDZ0COHxVxEGVY37icPzghhU=
github.com/riverqueue/river/riverdriver v0.18.0 h1:a2haR5I0MQLHjLCSVFpUEeJALCLemRl5zCztucysm1E=
Expand All @@ -49,8 +51,8 @@ github.com/riverqueue/river/rivertype v0.18.0 h1:YsXR5NbLAzniurGO0+zcISWMKq7Y71x
github.com/riverqueue/river/rivertype v0.18.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/samber/slog-http v1.5.1 h1:z5Ty/u5LKJbWjmjLDr6OgtwHXrPPrH2ZPoQE/47n9sU=
Expand Down
5 changes: 2 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"

"github.com/riverqueue/apiframe/apiendpoint"
"github.com/riverqueue/apiframe/apimiddleware"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/valutil"

"riverqueue.com/riverui/internal/apiendpoint"
"riverqueue.com/riverui/internal/apimiddleware"
)

// DB is the interface for a pgx database connection.
Expand Down
44 changes: 28 additions & 16 deletions handler_api_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"

"github.com/riverqueue/apiframe/apiendpoint"
"github.com/riverqueue/apiframe/apierror"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"

"riverqueue.com/riverui/internal/apiendpoint"
"riverqueue.com/riverui/internal/apierror"
"riverqueue.com/riverui/internal/dbsqlc"
"riverqueue.com/riverui/internal/querycacher"
"riverqueue.com/riverui/internal/util/pgxutil"
Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *healthCheckGetEndpoint) Execute(ctx context.Context, req *healthCheckGe
// fall through to OK status response below

default:
return nil, apierror.NewNotFound("Health check %q not found. Use either `complete` or `minimal`.", req.Name)
return nil, apierror.NewNotFoundf("Health check %q not found. Use either `complete` or `minimal`.", req.Name)
}

return statusResponseOK, nil
Expand Down Expand Up @@ -142,7 +142,7 @@ func (a *jobCancelEndpoint) Execute(ctx context.Context, req *jobCancelRequest)
job, err := a.client.JobCancelTx(ctx, tx, jobID)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundJob(jobID)
return nil, NewNotFoundJob(jobID)
}
return nil, err
}
Expand Down Expand Up @@ -185,10 +185,10 @@ func (a *jobDeleteEndpoint) Execute(ctx context.Context, req *jobDeleteRequest)
_, err := a.client.JobDeleteTx(ctx, tx, jobID)
if err != nil {
if errors.Is(err, rivertype.ErrJobRunning) {
return nil, apierror.NewBadRequest("Job %d is running and can't be deleted until it finishes.", jobID)
return nil, apierror.NewBadRequestf("Job %d is running and can't be deleted until it finishes.", jobID)
}
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundJob(jobID)
return nil, NewNotFoundJob(jobID)
}
return nil, err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (req *jobGetRequest) ExtractRaw(r *http.Request) error {

jobID, err := strconv.ParseInt(idString, 10, 64)
if err != nil {
return apierror.NewBadRequest("Couldn't convert job ID to int64: %s.", err)
return apierror.NewBadRequestf("Couldn't convert job ID to int64: %s.", err)
}
req.JobID = jobID

Expand All @@ -239,7 +239,7 @@ func (a *jobGetEndpoint) Execute(ctx context.Context, req *jobGetRequest) (*Rive
job, err := a.client.JobGetTx(ctx, tx, req.JobID)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundJob(req.JobID)
return nil, NewNotFoundJob(req.JobID)
}
return nil, fmt.Errorf("error getting job: %w", err)
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func (req *jobListRequest) ExtractRaw(r *http.Request) error {
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
return apierror.NewBadRequest("Couldn't convert `limit` to integer: %s.", err)
return apierror.NewBadRequestf("Couldn't convert `limit` to integer: %s.", err)
}

req.Limit = &limit
Expand Down Expand Up @@ -344,7 +344,7 @@ func (a *jobRetryEndpoint) Execute(ctx context.Context, req *jobRetryRequest) (*
_, err := a.client.JobRetryTx(ctx, tx, jobID)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundJob(jobID)
return nil, NewNotFoundJob(jobID)
}
return nil, err
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func (a *queueGetEndpoint) Execute(ctx context.Context, req *queueGetRequest) (*
queue, err := a.client.QueueGetTx(ctx, tx, req.Name)
if err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundQueue(req.Name)
return nil, NewNotFoundQueue(req.Name)
}
return nil, fmt.Errorf("error getting queue: %w", err)
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (req *queueListRequest) ExtractRaw(r *http.Request) error {
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
return apierror.NewBadRequest("Couldn't convert `limit` to integer: %s.", err)
return apierror.NewBadRequestf("Couldn't convert `limit` to integer: %s.", err)
}

req.Limit = &limit
Expand Down Expand Up @@ -490,7 +490,7 @@ func (a *queuePauseEndpoint) Execute(ctx context.Context, req *queuePauseRequest
return pgxutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*statusResponse, error) {
if err := a.client.QueuePauseTx(ctx, tx, req.Name, nil); err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundQueue(req.Name)
return nil, NewNotFoundQueue(req.Name)
}
return nil, fmt.Errorf("error pausing queue: %w", err)
}
Expand Down Expand Up @@ -532,7 +532,7 @@ func (a *queueResumeEndpoint) Execute(ctx context.Context, req *queueResumeReque
return pgxutil.WithTxV(ctx, a.dbPool, func(ctx context.Context, tx pgx.Tx) (*statusResponse, error) {
if err := a.client.QueueResumeTx(ctx, tx, req.Name, nil); err != nil {
if errors.Is(err, river.ErrNotFound) {
return nil, apierror.NewNotFoundQueue(req.Name)
return nil, NewNotFoundQueue(req.Name)
}
return nil, fmt.Errorf("error resuming queue: %w", err)
}
Expand Down Expand Up @@ -670,7 +670,7 @@ func (a *workflowGetEndpoint) Execute(ctx context.Context, req *workflowGetReque
}

if len(jobs) < 1 {
return nil, apierror.NewNotFoundWorkflow(req.ID)
return nil, NewNotFoundWorkflow(req.ID)
}

return &workflowGetResponse{
Expand Down Expand Up @@ -712,7 +712,7 @@ func (req *workflowListRequest) ExtractRaw(r *http.Request) error {
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
return apierror.NewBadRequest("Couldn't convert `limit` to integer: %s.", err)
return apierror.NewBadRequestf("Couldn't convert `limit` to integer: %s.", err)
}

req.Limit = &limit
Expand Down Expand Up @@ -758,6 +758,18 @@ func (a *workflowListEndpoint) Execute(ctx context.Context, req *workflowListReq
}
}

func NewNotFoundJob(jobID int64) *apierror.NotFound {
return apierror.NewNotFoundf("Job not found: %d.", jobID)
}

func NewNotFoundQueue(name string) *apierror.NotFound {
return apierror.NewNotFoundf("Queue not found: %s.", name)
}

func NewNotFoundWorkflow(id string) *apierror.NotFound {
return apierror.NewNotFoundf("Workflow not found: %s.", id)
}

type RiverJob struct {
ID int64 `json:"id"`
Args json.RawMessage `json:"args"`
Expand Down
20 changes: 10 additions & 10 deletions handler_api_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/require"

"github.com/riverqueue/apiframe/apierror"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"

"riverqueue.com/riverui/internal/apierror"
"riverqueue.com/riverui/internal/riverinternaltest"
"riverqueue.com/riverui/internal/riverinternaltest/testfactory"
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestHandlerHealthCheckGetEndpoint(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newHealthCheckGetEndpoint)

_, err := endpoint.Execute(ctx, &healthCheckGetRequest{Name: "other"})
requireAPIError(t, apierror.NewNotFound("Health check %q not found. Use either `complete` or `minimal`.", "other"), err)
requireAPIError(t, apierror.NewNotFoundf("Health check %q not found. Use either `complete` or `minimal`.", "other"), err)
})
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestJobCancelEndpoint(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newJobCancelEndpoint)

_, err := endpoint.Execute(ctx, &jobCancelRequest{JobIDs: []int64String{123}})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
requireAPIError(t, NewNotFoundJob(123), err)
})
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func TestJobDeleteEndpoint(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newJobDeleteEndpoint)

_, err := endpoint.Execute(ctx, &jobDeleteRequest{JobIDs: []int64String{123}})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
requireAPIError(t, NewNotFoundJob(123), err)
})
}

Expand All @@ -201,7 +201,7 @@ func TestJobGetEndpoint(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newJobGetEndpoint)

_, err := endpoint.Execute(ctx, &jobGetRequest{JobID: 123})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
requireAPIError(t, NewNotFoundJob(123), err)
})
}

Expand Down Expand Up @@ -322,7 +322,7 @@ func TestJobRetryEndpoint(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newJobRetryEndpoint)

_, err := endpoint.Execute(ctx, &jobRetryRequest{JobIDs: []int64String{123}})
requireAPIError(t, apierror.NewNotFoundJob(123), err)
requireAPIError(t, NewNotFoundJob(123), err)
})
}

Expand Down Expand Up @@ -353,7 +353,7 @@ func TestAPIHandlerQueueGet(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newQueueGetEndpoint)

_, err := endpoint.Execute(ctx, &queueGetRequest{Name: "does_not_exist"})
requireAPIError(t, apierror.NewNotFoundQueue("does_not_exist"), err)
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}

Expand Down Expand Up @@ -420,7 +420,7 @@ func TestAPIHandlerQueuePause(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newQueuePauseEndpoint)

_, err := endpoint.Execute(ctx, &queuePauseRequest{Name: "does_not_exist"})
requireAPIError(t, apierror.NewNotFoundQueue("does_not_exist"), err)
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}

Expand Down Expand Up @@ -449,7 +449,7 @@ func TestAPIHandlerQueueResume(t *testing.T) {
endpoint, _ := setupEndpoint(ctx, t, newQueueResumeEndpoint)

_, err := endpoint.Execute(ctx, &queueResumeRequest{Name: "does_not_exist"})
requireAPIError(t, apierror.NewNotFoundQueue("does_not_exist"), err)
requireAPIError(t, NewNotFoundQueue("does_not_exist"), err)
})
}

Expand Down Expand Up @@ -577,7 +577,7 @@ func TestAPIHandlerWorkflowGet(t *testing.T) {
workflowID := uuid.New()

_, err := endpoint.Execute(ctx, &workflowGetRequest{ID: workflowID.String()})
requireAPIError(t, apierror.NewNotFoundWorkflow(workflowID.String()), err)
requireAPIError(t, NewNotFoundWorkflow(workflowID.String()), err)
})
}

Expand Down
Loading
Loading