Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: Linters

on: [push, pull_request]
on:
push:
branches: [master]
pull_request:

jobs:
linters:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/semgrep.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: semgrep

on: [push, pull_request]
on:
push:
branches: [master]
pull_request:

jobs:
semgrep:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: tests

on: [push, pull_request]
on:
push:
branches: [master]
pull_request:

jobs:
golang:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ tests/vendor/
cmd
rr
**/old
.claude/settings.local.json
8 changes: 4 additions & 4 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package fsm

import (
"log/slog"
"sync/atomic"

"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

// NewFSM returns new FSM implementation based on initial state
func NewFSM(initialState int64, log *zap.Logger) *Fsm {
func NewFSM(initialState int64, log *slog.Logger) *Fsm {
f := &Fsm{log: log}
f.currentState.Store(initialState)
return f
}

// Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states
type Fsm struct {
log *zap.Logger
log *slog.Logger
numExecs atomic.Uint64
// to be lightweight, use UnixNano
lastUsed atomic.Uint64
Expand All @@ -38,7 +38,7 @@ Transition moves worker from one state to another
func (s *Fsm) Transition(to int64) {
err := s.recognizer(to)
if err != nil {
s.log.Debug("transition info, this is not an error", zap.String("debug", err.Error()))
s.log.Debug("transition info, this is not an error", "reason", err.Error())
return
}

Expand Down
184 changes: 179 additions & 5 deletions fsm/state_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package fsm

import (
"log/slog"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
)

func Test_NewState(t *testing.T) {
log, err := zap.NewDevelopment()
assert.NoError(t, err)
log := slog.Default()
st := NewFSM(StateErrored, log)

assert.Equal(t, "errored", st.String())
Expand All @@ -22,11 +23,184 @@ func Test_NewState(t *testing.T) {
}

func Test_IsActive(t *testing.T) {
log, err := zap.NewDevelopment()
assert.NoError(t, err)
log := slog.Default()
assert.False(t, NewFSM(StateInactive, log).IsActive())
assert.True(t, NewFSM(StateReady, log).IsActive())
assert.True(t, NewFSM(StateWorking, log).IsActive())
assert.False(t, NewFSM(StateStopped, log).IsActive())
assert.False(t, NewFSM(StateErrored, log).IsActive())
}

// TestFSM_Recognizer_ValidTransitions verifies every legal transition path the pool uses succeeds.
// If a valid path is accidentally broken, workers freeze in their current state forever.
func TestFSM_Recognizer_ValidTransitions(t *testing.T) {
log := slog.Default()

tests := []struct {
name string
path []int64
}{
{
name: "normal exec cycle: Inactive->Ready->Working->Ready",
path: []int64{StateInactive, StateReady, StateWorking, StateReady},
},
{
name: "supervisor TTL: Ready->StateTTLReached",
path: []int64{StateReady, StateTTLReached},
},
{
name: "supervisor memory: Ready->StateMaxMemoryReached",
path: []int64{StateReady, StateMaxMemoryReached},
},
{
name: "supervisor idle: Ready->StateIdleTTLReached",
path: []int64{StateReady, StateIdleTTLReached},
},
{
name: "supervisor marks busy worker: Working->StateInvalid",
path: []int64{StateReady, StateWorking, StateInvalid},
},
{
name: "max jobs reached: Ready->StateMaxJobsReached",
path: []int64{StateReady, StateMaxJobsReached},
},
{
name: "destroy from ready: Ready->StateDestroyed",
path: []int64{StateReady, StateDestroyed},
},
{
name: "destroy from working: Working->StateDestroyed",
path: []int64{StateReady, StateWorking, StateDestroyed},
},
{
name: "destroy from inactive: Inactive->StateDestroyed",
path: []int64{StateInactive, StateDestroyed},
},
{
name: "graceful stop: Working->StateStopping->StateStopped",
path: []int64{StateReady, StateWorking, StateStopping, StateStopped},
},
{
name: "exec TTL reached: Working->StateExecTTLReached",
path: []int64{StateReady, StateWorking, StateExecTTLReached},
},
{
name: "error from ready: Ready->StateErrored",
path: []int64{StateReady, StateErrored},
},
{
// Documents intentional behavior: any non-destroyed state can transition to Inactive.
// The recognizer only blocks Destroyed->Inactive; everything else falls through.
name: "Ready->Inactive (allow-all for Inactive)",
path: []int64{StateReady, StateInactive},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.GreaterOrEqual(t, len(tt.path), 2, "path must have at least 2 states")
f := NewFSM(tt.path[0], log)
for i := 1; i < len(tt.path); i++ {
f.Transition(tt.path[i])
assert.Equal(t, tt.path[i], f.CurrentState(),
"transition to %d failed from %d at step %d", tt.path[i], tt.path[i-1], i)
}
})
}
}

// TestFSM_Recognizer_BlockedTransitions verifies illegal transitions are rejected.
// Documents the state machine contract — terminal states can't resurrect, self-transitions are rejected, and invalid source states are blocked.
func TestFSM_Recognizer_BlockedTransitions(t *testing.T) {
log := slog.Default()

tests := []struct {
name string
from int64
to int64
}{
{name: "Destroyed->Ready", from: StateDestroyed, to: StateReady},
{name: "Destroyed->Working", from: StateDestroyed, to: StateWorking},
{name: "Destroyed->Inactive", from: StateDestroyed, to: StateInactive},
{name: "Destroyed->StateInvalid", from: StateDestroyed, to: StateInvalid},
{name: "Destroyed->StateStopping", from: StateDestroyed, to: StateStopping},
{name: "Ready->Ready", from: StateReady, to: StateReady},
{name: "Working->Working (only from Ready)", from: StateWorking, to: StateWorking},
{name: "Stopped->Ready (not in allowed from list)", from: StateStopped, to: StateReady},
{name: "StateErrored->Ready", from: StateErrored, to: StateReady},
{name: "StateErrored->Working", from: StateErrored, to: StateWorking},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := NewFSM(tt.from, log)
f.Transition(tt.to)
assert.Equal(t, tt.from, f.CurrentState(),
"transition from %d to %d should have been blocked", tt.from, tt.to)
})
}
}

// TestFSM_ConcurrentTransitions verifies state consistency under concurrent transitions.
// There's a TOCTOU gap between recognizer reading currentState and Store —
// the supervisor, exec path, and watcher all call Transition concurrently.
func TestFSM_ConcurrentTransitions(t *testing.T) {
log := slog.Default()
const goroutines = 100

// Phase 1: N goroutines all attempting Ready -> Working
f := NewFSM(StateReady, log)
var wg sync.WaitGroup
for range goroutines {
wg.Go(func() {
f.Transition(StateWorking)
})
}
wg.Wait()
assert.Equal(t, StateWorking, f.CurrentState())

// Phase 2: N goroutines all attempting Working -> Ready
for range goroutines {
wg.Go(func() {
f.Transition(StateReady)
})
}
wg.Wait()
assert.Equal(t, StateReady, f.CurrentState())

// Phase 3: Rapid cycle Ready -> Working -> Ready under concurrency
f2 := NewFSM(StateReady, log)
for range goroutines {
wg.Go(func() {
f2.Transition(StateWorking)
f2.Transition(StateReady)
})
}
wg.Wait()
// Final state must be one of the valid states
state := f2.CurrentState()
assert.True(t, state == StateReady || state == StateWorking,
"final state should be Ready or Working, got %d", state)
}

// TestFSM_RegisterExec_Concurrent validates that NumExecs is accurate under concurrency.
// Load-bearing for MaxJobs enforcement — if undercounts, workers never get replaced.
func TestFSM_RegisterExec_Concurrent(t *testing.T) {
log := slog.Default()
f := NewFSM(StateReady, log)

const goroutines = 100
const execsPerGoroutine = 100

var wg sync.WaitGroup
for range goroutines {
wg.Go(func() {
for range execsPerGoroutine {
f.RegisterExec()
}
})
}
wg.Wait()

assert.Equal(t, uint64(goroutines*execsPerGoroutine), f.NumExecs())
}
14 changes: 5 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
module github.com/roadrunner-server/pool
module github.com/roadrunner-server/pool/v2

go 1.26

require (
github.com/roadrunner-server/errors v1.4.1
github.com/roadrunner-server/events v1.0.1
github.com/roadrunner-server/goridge/v3 v3.8.3
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.1
golang.org/x/sync v0.19.0
golang.org/x/sync v0.20.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.39.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
golang.org/x/sys v0.42.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
24 changes: 6 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -19,9 +16,8 @@ github.com/roadrunner-server/errors v1.4.1 h1:LKNeaCGiwd3t8IaL840ZNF3UA9yDQlpvHn
github.com/roadrunner-server/errors v1.4.1/go.mod h1:qeffnIKG0e4j1dzGpa+OGY5VKSfMphizvqWIw8s2lAo=
github.com/roadrunner-server/events v1.0.1 h1:waCkKhxhzdK3VcI1xG22l+h+0J+Nfdpxjhyy01Un+kI=
github.com/roadrunner-server/events v1.0.1/go.mod h1:WZRqoEVaFm209t52EuoT7ISUtvX6BrCi6bI/7pjkVC0=
github.com/roadrunner-server/goridge/v3 v3.8.3 h1:XmjrOFnI6ZbQTPaP39DEk8KwLUNTgjluK3pcZaW6ixQ=
github.com/roadrunner-server/goridge/v3 v3.8.3/go.mod h1:4TZU8zgkKIZCsH51qwGMpvyXCT59u/8z6q8sCe4ZGAQ=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 h1:dO1wKnuMr4xMmH6DY2ZaZ6FWS+Vo50+C7fuAcyO/xBk=
github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1/go.mod h1:+gKla9HAyYlk0TsC9VktwtOL63aimsWT3oPsuCLh4/o=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
Expand All @@ -34,22 +30,14 @@ github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9R
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/goridge/v3/pkg/relay"
"github.com/roadrunner-server/goridge/v4/pkg/frame"
"github.com/roadrunner-server/goridge/v4/pkg/relay"
)

type StopCommand struct {
Expand Down
Loading
Loading