diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index b834d6e..f6d8a8b 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -1,6 +1,9 @@ name: Linters -on: [push, pull_request] +on: + push: + branches: [master] + pull_request: jobs: linters: diff --git a/.github/workflows/semgrep.yml b/.github/workflows/semgrep.yml index 069fc06..f2a66a7 100644 --- a/.github/workflows/semgrep.yml +++ b/.github/workflows/semgrep.yml @@ -1,6 +1,9 @@ name: semgrep -on: [push, pull_request] +on: + push: + branches: [master] + pull_request: jobs: semgrep: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e03f7b6..7f0b4be 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,6 +1,9 @@ name: tests -on: [push, pull_request] +on: + push: + branches: [master] + pull_request: jobs: golang: diff --git a/.gitignore b/.gitignore index 3ffd4cc..259e18a 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ tests/vendor/ cmd rr **/old +.claude/settings.local.json diff --git a/fsm/fsm.go b/fsm/fsm.go index d66a5c3..99e6e01 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -1,14 +1,14 @@ package fsm import ( + "log/slog" "sync/atomic" "github.com/roadrunner-server/errors" - "go.uber.org/zap" ) // NewFSM returns new FSM implementation based on initial state -func NewFSM(initialState int64, log *zap.Logger) *Fsm { +func NewFSM(initialState int64, log *slog.Logger) *Fsm { f := &Fsm{log: log} f.currentState.Store(initialState) return f @@ -16,7 +16,7 @@ func NewFSM(initialState int64, log *zap.Logger) *Fsm { // Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states type Fsm struct { - log *zap.Logger + log *slog.Logger numExecs atomic.Uint64 // to be lightweight, use UnixNano lastUsed atomic.Uint64 @@ -38,7 +38,7 @@ Transition moves worker from one state to another func (s *Fsm) Transition(to int64) { err := s.recognizer(to) if err != nil { - s.log.Debug("transition info, this is not an error", zap.String("debug", err.Error())) + s.log.Debug("transition info, this is not an error", "reason", err.Error()) return } diff --git a/fsm/state_test.go b/fsm/state_test.go index 66f051a..06da427 100755 --- a/fsm/state_test.go +++ b/fsm/state_test.go @@ -1,15 +1,16 @@ package fsm import ( + "log/slog" + "sync" "testing" "github.com/stretchr/testify/assert" - "go.uber.org/zap" + "github.com/stretchr/testify/require" ) func Test_NewState(t *testing.T) { - log, err := zap.NewDevelopment() - assert.NoError(t, err) + log := slog.Default() st := NewFSM(StateErrored, log) assert.Equal(t, "errored", st.String()) @@ -22,11 +23,184 @@ func Test_NewState(t *testing.T) { } func Test_IsActive(t *testing.T) { - log, err := zap.NewDevelopment() - assert.NoError(t, err) + log := slog.Default() assert.False(t, NewFSM(StateInactive, log).IsActive()) assert.True(t, NewFSM(StateReady, log).IsActive()) assert.True(t, NewFSM(StateWorking, log).IsActive()) assert.False(t, NewFSM(StateStopped, log).IsActive()) assert.False(t, NewFSM(StateErrored, log).IsActive()) } + +// TestFSM_Recognizer_ValidTransitions verifies every legal transition path the pool uses succeeds. +// If a valid path is accidentally broken, workers freeze in their current state forever. +func TestFSM_Recognizer_ValidTransitions(t *testing.T) { + log := slog.Default() + + tests := []struct { + name string + path []int64 + }{ + { + name: "normal exec cycle: Inactive->Ready->Working->Ready", + path: []int64{StateInactive, StateReady, StateWorking, StateReady}, + }, + { + name: "supervisor TTL: Ready->StateTTLReached", + path: []int64{StateReady, StateTTLReached}, + }, + { + name: "supervisor memory: Ready->StateMaxMemoryReached", + path: []int64{StateReady, StateMaxMemoryReached}, + }, + { + name: "supervisor idle: Ready->StateIdleTTLReached", + path: []int64{StateReady, StateIdleTTLReached}, + }, + { + name: "supervisor marks busy worker: Working->StateInvalid", + path: []int64{StateReady, StateWorking, StateInvalid}, + }, + { + name: "max jobs reached: Ready->StateMaxJobsReached", + path: []int64{StateReady, StateMaxJobsReached}, + }, + { + name: "destroy from ready: Ready->StateDestroyed", + path: []int64{StateReady, StateDestroyed}, + }, + { + name: "destroy from working: Working->StateDestroyed", + path: []int64{StateReady, StateWorking, StateDestroyed}, + }, + { + name: "destroy from inactive: Inactive->StateDestroyed", + path: []int64{StateInactive, StateDestroyed}, + }, + { + name: "graceful stop: Working->StateStopping->StateStopped", + path: []int64{StateReady, StateWorking, StateStopping, StateStopped}, + }, + { + name: "exec TTL reached: Working->StateExecTTLReached", + path: []int64{StateReady, StateWorking, StateExecTTLReached}, + }, + { + name: "error from ready: Ready->StateErrored", + path: []int64{StateReady, StateErrored}, + }, + { + // Documents intentional behavior: any non-destroyed state can transition to Inactive. + // The recognizer only blocks Destroyed->Inactive; everything else falls through. + name: "Ready->Inactive (allow-all for Inactive)", + path: []int64{StateReady, StateInactive}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.GreaterOrEqual(t, len(tt.path), 2, "path must have at least 2 states") + f := NewFSM(tt.path[0], log) + for i := 1; i < len(tt.path); i++ { + f.Transition(tt.path[i]) + assert.Equal(t, tt.path[i], f.CurrentState(), + "transition to %d failed from %d at step %d", tt.path[i], tt.path[i-1], i) + } + }) + } +} + +// TestFSM_Recognizer_BlockedTransitions verifies illegal transitions are rejected. +// Documents the state machine contract — terminal states can't resurrect, self-transitions are rejected, and invalid source states are blocked. +func TestFSM_Recognizer_BlockedTransitions(t *testing.T) { + log := slog.Default() + + tests := []struct { + name string + from int64 + to int64 + }{ + {name: "Destroyed->Ready", from: StateDestroyed, to: StateReady}, + {name: "Destroyed->Working", from: StateDestroyed, to: StateWorking}, + {name: "Destroyed->Inactive", from: StateDestroyed, to: StateInactive}, + {name: "Destroyed->StateInvalid", from: StateDestroyed, to: StateInvalid}, + {name: "Destroyed->StateStopping", from: StateDestroyed, to: StateStopping}, + {name: "Ready->Ready", from: StateReady, to: StateReady}, + {name: "Working->Working (only from Ready)", from: StateWorking, to: StateWorking}, + {name: "Stopped->Ready (not in allowed from list)", from: StateStopped, to: StateReady}, + {name: "StateErrored->Ready", from: StateErrored, to: StateReady}, + {name: "StateErrored->Working", from: StateErrored, to: StateWorking}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := NewFSM(tt.from, log) + f.Transition(tt.to) + assert.Equal(t, tt.from, f.CurrentState(), + "transition from %d to %d should have been blocked", tt.from, tt.to) + }) + } +} + +// TestFSM_ConcurrentTransitions verifies state consistency under concurrent transitions. +// There's a TOCTOU gap between recognizer reading currentState and Store — +// the supervisor, exec path, and watcher all call Transition concurrently. +func TestFSM_ConcurrentTransitions(t *testing.T) { + log := slog.Default() + const goroutines = 100 + + // Phase 1: N goroutines all attempting Ready -> Working + f := NewFSM(StateReady, log) + var wg sync.WaitGroup + for range goroutines { + wg.Go(func() { + f.Transition(StateWorking) + }) + } + wg.Wait() + assert.Equal(t, StateWorking, f.CurrentState()) + + // Phase 2: N goroutines all attempting Working -> Ready + for range goroutines { + wg.Go(func() { + f.Transition(StateReady) + }) + } + wg.Wait() + assert.Equal(t, StateReady, f.CurrentState()) + + // Phase 3: Rapid cycle Ready -> Working -> Ready under concurrency + f2 := NewFSM(StateReady, log) + for range goroutines { + wg.Go(func() { + f2.Transition(StateWorking) + f2.Transition(StateReady) + }) + } + wg.Wait() + // Final state must be one of the valid states + state := f2.CurrentState() + assert.True(t, state == StateReady || state == StateWorking, + "final state should be Ready or Working, got %d", state) +} + +// TestFSM_RegisterExec_Concurrent validates that NumExecs is accurate under concurrency. +// Load-bearing for MaxJobs enforcement — if undercounts, workers never get replaced. +func TestFSM_RegisterExec_Concurrent(t *testing.T) { + log := slog.Default() + f := NewFSM(StateReady, log) + + const goroutines = 100 + const execsPerGoroutine = 100 + + var wg sync.WaitGroup + for range goroutines { + wg.Go(func() { + for range execsPerGoroutine { + f.RegisterExec() + } + }) + } + wg.Wait() + + assert.Equal(t, uint64(goroutines*execsPerGoroutine), f.NumExecs()) +} diff --git a/go.mod b/go.mod index e6d3f79..2453252 100644 --- a/go.mod +++ b/go.mod @@ -1,29 +1,25 @@ -module github.com/roadrunner-server/pool +module github.com/roadrunner-server/pool/v2 go 1.26 require ( github.com/roadrunner-server/errors v1.4.1 github.com/roadrunner-server/events v1.0.1 - github.com/roadrunner-server/goridge/v3 v3.8.3 + github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.11.1 - go.uber.org/zap v1.27.1 - golang.org/x/sync v0.19.0 + golang.org/x/sync v0.20.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.39.0 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + golang.org/x/sys v0.42.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a81fe57..9e42632 100644 --- a/go.sum +++ b/go.sum @@ -6,11 +6,8 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -19,9 +16,8 @@ github.com/roadrunner-server/errors v1.4.1 h1:LKNeaCGiwd3t8IaL840ZNF3UA9yDQlpvHn github.com/roadrunner-server/errors v1.4.1/go.mod h1:qeffnIKG0e4j1dzGpa+OGY5VKSfMphizvqWIw8s2lAo= github.com/roadrunner-server/events v1.0.1 h1:waCkKhxhzdK3VcI1xG22l+h+0J+Nfdpxjhyy01Un+kI= github.com/roadrunner-server/events v1.0.1/go.mod h1:WZRqoEVaFm209t52EuoT7ISUtvX6BrCi6bI/7pjkVC0= -github.com/roadrunner-server/goridge/v3 v3.8.3 h1:XmjrOFnI6ZbQTPaP39DEk8KwLUNTgjluK3pcZaW6ixQ= -github.com/roadrunner-server/goridge/v3 v3.8.3/go.mod h1:4TZU8zgkKIZCsH51qwGMpvyXCT59u/8z6q8sCe4ZGAQ= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1 h1:dO1wKnuMr4xMmH6DY2ZaZ6FWS+Vo50+C7fuAcyO/xBk= +github.com/roadrunner-server/goridge/v4 v4.0.0-beta.1/go.mod h1:+gKla9HAyYlk0TsC9VktwtOL63aimsWT3oPsuCLh4/o= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= @@ -34,22 +30,14 @@ github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9R github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= -go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= -go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= -golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/protocol.go b/internal/protocol.go index 22f2d83..bbc5186 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -6,8 +6,8 @@ import ( "sync" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/goridge/v3/pkg/frame" - "github.com/roadrunner-server/goridge/v3/pkg/relay" + "github.com/roadrunner-server/goridge/v4/pkg/frame" + "github.com/roadrunner-server/goridge/v4/pkg/relay" ) type StopCommand struct { diff --git a/ipc/pipe/pipe.go b/ipc/pipe/pipe.go index bc47a7f..9994b9b 100755 --- a/ipc/pipe/pipe.go +++ b/ipc/pipe/pipe.go @@ -4,23 +4,24 @@ import ( "context" "os/exec" + "log/slog" + "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/goridge/v3/pkg/pipe" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/internal" - "github.com/roadrunner-server/pool/worker" - "go.uber.org/zap" + "github.com/roadrunner-server/goridge/v4/pkg/pipe" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/internal" + "github.com/roadrunner-server/pool/v2/worker" ) // Factory connects to stack using standard // streams (STDIN, STDOUT pipes). type Factory struct { - log *zap.Logger + log *slog.Logger } // NewPipeFactory returns new factory instance and starts // listening -func NewPipeFactory(log *zap.Logger) *Factory { +func NewPipeFactory(log *slog.Logger) *Factory { return &Factory{ log: log, } @@ -34,7 +35,7 @@ type sr struct { // SpawnWorkerWithContext Creates a new Process and connects it to goridge relay, // method Wait() must be handled on the level above. func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd, options ...worker.Options) (*worker.Process, error) { - spCh := make(chan sr) + spCh := make(chan sr, 1) go func() { w, err := worker.InitBaseWorker(cmd, options...) if err != nil { diff --git a/ipc/pipe/pipe_spawn_test.go b/ipc/pipe/pipe_spawn_test.go index 6019e3c..b905376 100644 --- a/ipc/pipe/pipe_spawn_test.go +++ b/ipc/pipe/pipe_spawn_test.go @@ -6,15 +6,16 @@ import ( "testing" "time" + "log/slog" + "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) -var log = zap.NewNop() +var log = slog.New(slog.DiscardHandler) func Test_GetState2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") diff --git a/ipc/pipe/pipe_test.go b/ipc/pipe/pipe_test.go index 3f2f14f..4dc0911 100644 --- a/ipc/pipe/pipe_test.go +++ b/ipc/pipe/pipe_test.go @@ -8,11 +8,10 @@ import ( "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func Test_GetState(t *testing.T) { @@ -442,7 +441,6 @@ func Benchmark_WorkerPipeTTL(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := b.Context() - log, _ = zap.NewDevelopment() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) require.NoError(b, err) diff --git a/ipc/socket/socket.go b/ipc/socket/socket.go index 51d9d7b..f745d9e 100755 --- a/ipc/socket/socket.go +++ b/ipc/socket/socket.go @@ -8,14 +8,15 @@ import ( "sync" "time" + "log/slog" + "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/goridge/v3/pkg/relay" - "github.com/roadrunner-server/goridge/v3/pkg/socket" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/internal" - "github.com/roadrunner-server/pool/worker" + "github.com/roadrunner-server/goridge/v4/pkg/relay" + "github.com/roadrunner-server/goridge/v4/pkg/socket" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/internal" + "github.com/roadrunner-server/pool/v2/worker" "github.com/shirou/gopsutil/process" - "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -26,11 +27,11 @@ type Factory struct { ls net.Listener // sockets which are waiting for process association relays sync.Map - log *zap.Logger + log *slog.Logger } // NewSocketServer returns Factory attached to a given socket listener. -func NewSocketServer(ls net.Listener, log *zap.Logger) *Factory { +func NewSocketServer(ls net.Listener, log *slog.Logger) *Factory { f := &Factory{ ls: ls, log: log, @@ -50,7 +51,7 @@ func NewSocketServer(ls net.Listener, log *zap.Logger) *Factory { } } - log.Warn("socket server listen", zap.Error(err)) + log.Warn("socket server listen", "error", err) } }() @@ -163,26 +164,26 @@ func (f *Factory) Close() error { return f.ls.Close() } -// waits for Process to connect over socket and returns associated relay of timeout +// waits for Process to connect over socket and returns associated relay or timeout func (f *Factory) findRelayWithContext(ctx context.Context, w *worker.Process) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 10) + defer ticker.Stop() for { + // fast path: check relay map immediately + rl, ok := f.relays.LoadAndDelete(w.Pid()) + if ok { + return rl.(*socket.Relay), nil + } + select { case <-ctx.Done(): return nil, errors.E(errors.Op("findRelayWithContext"), errors.TimeOut) case <-ticker.C: - // check for the process exists + // check if process still exists _, err := process.NewProcess(int32(w.Pid())) //nolint:gosec if err != nil { return nil, err } - default: - rl, ok := f.relays.LoadAndDelete(w.Pid()) - if !ok { - continue - } - - return rl.(*socket.Relay), nil } } } diff --git a/ipc/socket/socket_spawn_test.go b/ipc/socket/socket_spawn_test.go index 1d511f6..c182ffc 100644 --- a/ipc/socket/socket_spawn_test.go +++ b/ipc/socket/socket_spawn_test.go @@ -8,13 +8,14 @@ import ( "testing" "time" - "github.com/roadrunner-server/pool/payload" + "log/slog" + + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) -var log = zap.NewNop() +var log = slog.New(slog.DiscardHandler) func Test_Tcp_Start2(t *testing.T) { ls, err := net.Listen("tcp", "127.0.0.1:9007") @@ -263,7 +264,7 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) diff --git a/ipc/socket/socket_test.go b/ipc/socket/socket_test.go index b666b5b..241fe3a 100755 --- a/ipc/socket/socket_test.go +++ b/ipc/socket/socket_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -110,7 +110,7 @@ func Test_Tcp_StartError(t *testing.T) { func Test_Tcp_Failboot(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(t.Context(), time.Second*20) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() ls, err := net.Listen("tcp", "127.0.0.1:9007") diff --git a/pool/allocator.go b/pool/allocator.go index f9b6049..e5f8229 100644 --- a/pool/allocator.go +++ b/pool/allocator.go @@ -5,10 +5,11 @@ import ( "os/exec" "time" + "log/slog" + "github.com/roadrunner-server/errors" "github.com/roadrunner-server/events" - "github.com/roadrunner-server/pool/worker" - "go.uber.org/zap" + "github.com/roadrunner-server/pool/v2/worker" "golang.org/x/sync/errgroup" ) @@ -22,7 +23,7 @@ type Factory interface { } // NewPoolAllocator initializes allocator of the workers -func NewPoolAllocator(ctx context.Context, timeout time.Duration, maxExecs uint64, factory Factory, cmd Command, command []string, log *zap.Logger) func() (*worker.Process, error) { +func NewPoolAllocator(ctx context.Context, timeout time.Duration, maxExecs uint64, factory Factory, cmd Command, command []string, log *slog.Logger) func() (*worker.Process, error) { return func() (*worker.Process, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -37,7 +38,7 @@ func NewPoolAllocator(ctx context.Context, timeout time.Duration, maxExecs uint6 } // wrap sync worker - log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.Uint64("max_execs", w.MaxExecs()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) + log.Debug("worker is allocated", "pid", w.Pid(), "max_execs", w.MaxExecs(), "internal_event_name", events.EventWorkerConstruct.String()) return w, nil } } diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index 8fe8981..96149f9 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -5,14 +5,12 @@ import ( "runtime" "github.com/roadrunner-server/events" - "github.com/roadrunner-server/goridge/v3/pkg/frame" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/payload" - "go.uber.org/zap" + "github.com/roadrunner-server/goridge/v4/pkg/frame" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/payload" ) // execDebug used when debug mode was not set and exec_ttl is 0 -// TODO DRY func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *PExec, error) { sp.log.Debug("executing in debug mode, worker will be destroyed after response is received") w, err := sp.allocator() @@ -41,17 +39,17 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s go func() { //nolint:gosec // G118 - intentional: per-iteration exec timeout must be independent of request context // would be called on Goexit defer func() { - sp.log.Debug("stopping [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) + sp.log.Debug("stopping [stream] worker", "pid", w.Pid(), "state", w.State().String()) close(resp) // destroy the worker errD := w.Stop() if errD != nil { sp.log.Debug( "debug mode: worker stopped with error", - zap.String("reason", "worker error"), - zap.Int64("pid", w.Pid()), - zap.String("internal_event_name", events.EventWorkerError.String()), - zap.Error(errD), + "reason", "worker error", + "pid", w.Pid(), + "internal_event_name", events.EventWorkerError.String(), + "error", errD, ) } }() @@ -61,17 +59,17 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s select { // we received stop signal case <-stopCh: - sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) + sp.log.Debug("stream stop signal received", "pid", w.Pid(), "state", w.State().String()) ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) err = w.StreamCancel(ctxT) cancelT() if err != nil { w.State().Transition(fsm.StateErrored) - sp.log.Warn("stream cancel error", zap.Error(err)) + sp.log.Warn("stream cancel error", "error", err) } else { // successfully canceled w.State().Transition(fsm.StateReady) - sp.log.Debug("transition to the ready state", zap.String("from", w.State().String())) + sp.log.Debug("transition to the ready state", "from", w.State().String()) } runtime.Goexit() @@ -83,7 +81,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s pld, next, errI := w.StreamIterWithContext(ctxT) cancelT() if errI != nil { - sp.log.Warn("stream error", zap.Error(err)) + sp.log.Warn("stream error", "error", errI) resp <- newPExec(nil, errI) @@ -103,7 +101,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s // non supervised execution, can potentially hang here pld, next, errI := w.StreamIter() if errI != nil { - sp.log.Warn("stream iter error", zap.Error(err)) + sp.log.Warn("stream iter error", "error", errI) // send error response resp <- newPExec(nil, errI) @@ -135,10 +133,10 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s if errD != nil { sp.log.Debug( "debug mode: worker stopped with error", - zap.String("reason", "worker error"), - zap.Int64("pid", w.Pid()), - zap.String("internal_event_name", events.EventWorkerError.String()), - zap.Error(errD), + "reason", "worker error", + "pid", w.Pid(), + "internal_event_name", events.EventWorkerError.String(), + "error", errD, ) } diff --git a/pool/static_pool/debug_test.go b/pool/static_pool/debug_test.go new file mode 100644 index 0000000..526f5c1 --- /dev/null +++ b/pool/static_pool/debug_test.go @@ -0,0 +1,75 @@ +package static_pool + +import ( + "os/exec" + "testing" + "time" + + "log/slog" + + "github.com/roadrunner-server/pool/v2/ipc/pipe" + "github.com/roadrunner-server/pool/v2/payload" + "github.com/roadrunner-server/pool/v2/pool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExecDebug_NonStream(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + Debug: true, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // Debug mode should have zero pre-allocated workers + assert.Empty(t, p.Workers()) + + // Execute request — goes through execDebug (non-stream branch) + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: []byte("")}, make(chan struct{})) + require.NoError(t, err) + + resp := <-r + assert.Equal(t, []byte("hello"), resp.Body()) + assert.NoError(t, resp.Error()) + + // Worker should be destroyed after response — no workers in pool + assert.Empty(t, p.Workers()) +} + +func TestExecDebug_FreshWorkerPerRequest(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + Debug: true, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // Each request in debug mode should use a fresh worker (different PID) + pids := make(map[string]struct{}) + for range 3 { + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + pids[string(resp.Body())] = struct{}{} + } + + // All PIDs should be different — each request creates a new worker + assert.Len(t, pids, 3, "debug mode should spawn a fresh worker for each request") +} diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 276cda0..ab681fa 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -9,11 +9,12 @@ import ( "sync/atomic" "time" - "github.com/roadrunner-server/pool/pool" - "github.com/roadrunner-server/pool/pool/ratelimiter" - "github.com/roadrunner-server/pool/worker" - "github.com/roadrunner-server/pool/worker_watcher" - "go.uber.org/zap" + "log/slog" + + "github.com/roadrunner-server/pool/v2/pool" + "github.com/roadrunner-server/pool/v2/pool/ratelimiter" + "github.com/roadrunner-server/pool/v2/worker" + "github.com/roadrunner-server/pool/v2/worker_watcher" ) type dynAllocator struct { @@ -26,7 +27,7 @@ type dynAllocator struct { currAllocated atomic.Uint64 mu *sync.Mutex started atomic.Bool - log *zap.Logger + log *slog.Logger // pool ww *worker_watcher.WorkerWatcher allocator func() (*worker.Process, error) @@ -38,7 +39,7 @@ type dynAllocator struct { } func newDynAllocator( - log *zap.Logger, + log *slog.Logger, ww *worker_watcher.WorkerWatcher, alloc func() (*worker.Process, error), stopCh chan struct{}, @@ -79,9 +80,9 @@ func (da *dynAllocator) addMoreWorkers() { defer da.mu.Unlock() da.log.Debug("No free workers, trying to allocate dynamically", - zap.Duration("idle_timeout", da.idleTimeout), - zap.Uint64("max_workers", da.maxWorkers), - zap.Uint64("spawn_rate", da.spawnRate)) + "idle_timeout", da.idleTimeout, + "max_workers", da.maxWorkers, + "spawn_rate", da.spawnRate) if !da.started.Load() { // start the dynamic allocator listener @@ -92,7 +93,7 @@ func (da *dynAllocator) addMoreWorkers() { // if we already allocated max workers, we can't allocate more if da.currAllocated.Load() >= da.maxWorkers { // can't allocate more - da.log.Warn("can't allocate more workers, already allocated max workers", zap.Uint64("max_workers", da.maxWorkers)) + da.log.Warn("can't allocate more workers, already allocated max workers", "max_workers", da.maxWorkers) return } @@ -106,20 +107,20 @@ func (da *dynAllocator) addMoreWorkers() { err := da.ww.AddWorker() if err != nil { - da.log.Error("failed to allocate worker", zap.Error(err)) + da.log.Error("failed to allocate worker", "error", err) continue } // increase the number of additionally allocated options aw := da.currAllocated.Add(1) - da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", aw)) + da.log.Debug("allocated additional worker", "currently additionally allocated", aw) } - da.log.Debug("currently allocated", zap.Uint64("number", da.currAllocated.Load())) + da.log.Debug("currently allocated", "number", da.currAllocated.Load()) } func (da *dynAllocator) startIdleTTLListener() { - da.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", da.idleTimeout)) + da.log.Debug("starting dynamic allocator listener", "idle_timeout", da.idleTimeout) go func() { // DynamicAllocatorOpts are read-only, so we can use them without a lock triggerTTL := time.NewTicker(da.idleTimeout) @@ -137,7 +138,7 @@ func (da *dynAllocator) startIdleTTLListener() { return // when this channel is triggered, we should deallocate all dynamically allocated workers case <-triggerTTL.C: - da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) + da.log.Debug("dynamic workers TTL", "reason", "idle timeout reached") // check the last allocation time - if we had an allocation recently (within idleTimeout), we should skip deallocation lastAlloc := da.lastAllocTry.Load() if lastAlloc != nil && time.Since(*lastAlloc) < da.idleTimeout { @@ -160,7 +161,7 @@ func (da *dynAllocator) startIdleTTLListener() { } alloc := da.currAllocated.Load() - da.log.Debug("deallocating dynamically allocated workers", zap.Uint64("to_deallocate", alloc)) + da.log.Debug("deallocating dynamically allocated workers", "to_deallocate", alloc) if alloc >= da.spawnRate { // deallocate in batches @@ -176,20 +177,20 @@ func (da *dynAllocator) startIdleTTLListener() { // the only error we can get here is NoFreeWorkers, meaning all workers are busy if err != nil { // we should stop deallocation attempts - da.log.Error("failed to remove worker from the pool, stopping deallocation", zap.Error(err)) + da.log.Error("failed to remove worker from the pool, stopping deallocation", "error", err) // Don't decrement counter if removal failed - worker still exists break } // decrease the number of additionally allocated workers nw := da.currAllocated.Add(^uint64(0)) - da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", nw)) + da.log.Debug("deallocated additional worker", "currently additionally allocated", nw) } if da.currAllocated.Load() > 0 { // if we still have allocated workers, we should keep the listener running da.mu.Unlock() - da.log.Debug("dynamic allocator listener continuing, still have dynamically allocated workers", zap.Uint64("remaining", da.currAllocated.Load())) + da.log.Debug("dynamic allocator listener continuing, still have dynamically allocated workers", "remaining", da.currAllocated.Load()) continue } diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go index 74fb633..3398928 100644 --- a/pool/static_pool/dyn_allocator_test.go +++ b/pool/static_pool/dyn_allocator_test.go @@ -6,19 +6,15 @@ import ( "testing" "time" - "github.com/roadrunner-server/pool/ipc/pipe" - "github.com/roadrunner-server/pool/payload" - "github.com/roadrunner-server/pool/pool" + "log/slog" + + "github.com/roadrunner-server/pool/v2/ipc/pipe" + "github.com/roadrunner-server/pool/v2/payload" + "github.com/roadrunner-server/pool/v2/pool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) -var dynlog = func() *zap.Logger { - logger, _ := zap.NewDevelopment() - return logger -} - var testDynCfg = &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second * 5, @@ -34,9 +30,9 @@ func Test_DynAllocator(t *testing.T) { np, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(dynlog()), + pipe.NewPipeFactory(slog.Default()), testDynCfg, - dynlog(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, np) @@ -69,9 +65,9 @@ func Test_DynAllocatorManyReq(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "slow_req", "pipes") }, - pipe.NewPipeFactory(dynlog()), + pipe.NewPipeFactory(slog.Default()), testDynCfgMany, - dynlog(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, np) @@ -123,9 +119,9 @@ func Test_DynamicPool_OverMax(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), dynAllCfg, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -215,9 +211,9 @@ func Test_DynamicPool(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), dynAllCfg, - log(), + slog.Default(), ) assert.NoError(t, errp) assert.NotNil(t, p) @@ -268,9 +264,9 @@ func Test_DynamicPool_500W(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), dynAllCfg, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -323,9 +319,9 @@ func Test_DynAllocator_100Workers(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(dynlog()), + pipe.NewPipeFactory(slog.Default()), cfg, - dynlog(), + slog.Default(), ) require.NoError(t, err) require.NotNil(t, p) @@ -388,9 +384,9 @@ func Test_DynAllocator_ReallocationCycle(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(dynlog()), + pipe.NewPipeFactory(slog.Default()), cfg, - dynlog(), + slog.Default(), ) require.NoError(t, err) require.NotNil(t, p) @@ -442,3 +438,170 @@ func Test_DynAllocator_ReallocationCycle(t *testing.T) { assert.Equal(t, uint64(0), p.NumDynamic(), "cycle 2: all dynamic workers should be deallocated") assert.Len(t, p.Workers(), 2, "cycle 2: should return to base count after re-allocation") } + +// ==================== Dynamic Allocator Edge Cases ==================== + +// Test_DynAllocator_SpawnRate_CappedByMaxWorkers verifies that spawnRate doesn't exceed maxWorkers. +// The `currAllocated >= maxWorkers` break in addMoreWorkers() spawn loop. Without it, over-allocation occurs. +func Test_DynAllocator_SpawnRate_CappedByMaxWorkers(t *testing.T) { + cfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 20, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 3, + SpawnRate: 10, // higher than maxWorkers + IdleTimeout: time.Second * 10, + }, + } + + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(slog.Default()), + cfg, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + wg := &sync.WaitGroup{} + // Fire enough requests to trigger dynamic allocation + for range 20 { + wg.Go(func() { + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + + // Wait a bit for allocation to happen + time.Sleep(time.Second * 3) + + // Dynamic workers should not exceed maxWorkers=3 + dynCount := p.NumDynamic() + assert.LessOrEqual(t, dynCount, uint64(3), + "dynamic workers should not exceed maxWorkers, got %d", dynCount) + + wg.Wait() +} + +// Test_DynAllocator_CounterConsistency_AfterFailedRemoval verifies that currAllocated +// doesn't decrement when RemoveWorker fails (all workers busy). +// The break on removal failure in startIdleTTLListener() deallocation loop prevents counter desync. +func Test_DynAllocator_CounterConsistency_AfterFailedRemoval(t *testing.T) { + cfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 30, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 5, + SpawnRate: 5, + IdleTimeout: time.Second * 3, + }, + } + + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/client.php", "delay", "pipes") + }, + pipe.NewPipeFactory(slog.Default()), + cfg, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + wg := &sync.WaitGroup{} + // Fire requests to trigger dynamic allocation and keep workers busy + for range 20 { + wg.Go(func() { + // 3s delay keeps workers busy during deallocation attempt + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("3000")}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + + time.Sleep(time.Second * 2) + + // Verify dynamic workers were allocated + dynBefore := p.NumDynamic() + t.Log("dynamic workers before deallocation:", dynBefore) + + wg.Wait() + + // After all requests complete, wait for deallocation + time.Sleep(time.Second * 10) + + // Counter should eventually reach 0 (all dynamic workers deallocated) + assert.Equal(t, uint64(0), p.NumDynamic(), + "all dynamic workers should be deallocated after idle timeout") +} + +// Test_DynAllocator_RateLimit_ThunderingHerd verifies that the rate limiter prevents +// over-allocation under thundering herd conditions. +// The TryAcquire() rate limiter at the top of addMoreWorkers() gates concurrent allocation. Without it, each pending request +// could trigger a spawn batch. +func Test_DynAllocator_RateLimit_ThunderingHerd(t *testing.T) { + cfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 30, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 5, + SpawnRate: 5, + IdleTimeout: time.Second * 10, + }, + } + + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(slog.Default()), + cfg, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + wg := &sync.WaitGroup{} + // Fire 50 simultaneous requests — thundering herd + for range 50 { + wg.Go(func() { + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + + // Wait for allocation attempts + time.Sleep(time.Second * 3) + + // Dynamic workers should not exceed maxWorkers despite 50 concurrent requests + totalWorkers := len(p.Workers()) + dynWorkers := p.NumDynamic() + + t.Log("total workers:", totalWorkers, "dynamic:", dynWorkers) + // 1 base + up to 5 dynamic = max 6 total + assert.LessOrEqual(t, totalWorkers, 6, + "total workers should not exceed base + maxDynamic, got %d", totalWorkers) + assert.LessOrEqual(t, dynWorkers, uint64(5), + "dynamic workers should not exceed maxWorkers=5, got %d", dynWorkers) + + wg.Wait() +} diff --git a/pool/static_pool/fuzz_test.go b/pool/static_pool/fuzz_test.go index 5fdd0cd..2160ecf 100644 --- a/pool/static_pool/fuzz_test.go +++ b/pool/static_pool/fuzz_test.go @@ -1,11 +1,12 @@ package static_pool import ( + "log/slog" "os/exec" "testing" - "github.com/roadrunner-server/pool/ipc/pipe" - "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/v2/ipc/pipe" + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" ) @@ -15,9 +16,9 @@ func FuzzStaticPoolEcho(f *testing.F) { p, err := NewPool( f.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(f, err) assert.NotNil(f, p) diff --git a/pool/static_pool/options.go b/pool/static_pool/options.go index 42a9a8a..1dd51b9 100644 --- a/pool/static_pool/options.go +++ b/pool/static_pool/options.go @@ -1,14 +1,14 @@ package static_pool import ( - "go.uber.org/zap" + "log/slog" ) type Options func(p *Pool) -func WithLogger(z *zap.Logger) Options { +func WithLogger(logger *slog.Logger) Options { return func(p *Pool) { - p.log = z + p.log = logger } } diff --git a/pool/static_pool/options_test.go b/pool/static_pool/options_test.go new file mode 100644 index 0000000..997f189 --- /dev/null +++ b/pool/static_pool/options_test.go @@ -0,0 +1,36 @@ +package static_pool + +import ( + "log/slog" + "testing" + + "github.com/roadrunner-server/pool/v2/pool" + "github.com/stretchr/testify/assert" +) + +func TestWithLogger_SetsLogger(t *testing.T) { + logger := slog.New(slog.DiscardHandler) + p := &Pool{cfg: &pool.Config{}} + WithLogger(logger)(p) + assert.Equal(t, logger, p.log) +} + +func TestWithQueueSize_SetsMaxQueueSize(t *testing.T) { + p := &Pool{cfg: &pool.Config{}} + WithQueueSize(42)(p) + assert.Equal(t, uint64(42), p.maxQueueSize.Load()) +} + +func TestWithQueueSize_Zero(t *testing.T) { + p := &Pool{cfg: &pool.Config{}} + WithQueueSize(100)(p) + WithQueueSize(0)(p) + assert.Equal(t, uint64(0), p.maxQueueSize.Load()) +} + +func TestWithNumWorkers_SetsNumWorkers(t *testing.T) { + cfg := &pool.Config{NumWorkers: 1} + p := &Pool{cfg: cfg} + WithNumWorkers(8)(p) + assert.Equal(t, uint64(8), p.cfg.NumWorkers) +} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index e5a894f..56a00d8 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -7,15 +7,16 @@ import ( "sync/atomic" "unsafe" + "log/slog" + "github.com/roadrunner-server/errors" "github.com/roadrunner-server/events" - "github.com/roadrunner-server/goridge/v3/pkg/frame" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/payload" - "github.com/roadrunner-server/pool/pool" - "github.com/roadrunner-server/pool/worker" - workerWatcher "github.com/roadrunner-server/pool/worker_watcher" - "go.uber.org/zap" + "github.com/roadrunner-server/goridge/v4/pkg/frame" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/payload" + "github.com/roadrunner-server/pool/v2/pool" + "github.com/roadrunner-server/pool/v2/worker" + workerWatcher "github.com/roadrunner-server/pool/v2/worker_watcher" ) const ( @@ -28,7 +29,7 @@ type Pool struct { // pool configuration cfg *pool.Config // logger - log *zap.Logger + log *slog.Logger // worker command creator cmd pool.Command // creates and connects to workers @@ -49,7 +50,7 @@ type Pool struct { } // NewPool creates a new worker pool and task multiplexer. Pool will initialize with the configured number of workers. If supervisor configuration is provided -> pool will be turned into a supervisedExec mode -func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *pool.Config, log *zap.Logger, options ...Options) (*Pool, error) { +func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *pool.Config, log *slog.Logger, options ...Options) (*Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -86,11 +87,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p } if p.log == nil { - var err error - p.log, err = zap.NewDevelopment() - if err != nil { - return nil, err - } + p.log = slog.Default() } // set up workers' allocator @@ -213,7 +210,7 @@ begin: } if w.MaxExecsReached() { - sp.log.Debug("requests execution limit reached, worker will be restarted", zap.Int64("pid", w.Pid()), zap.Uint64("execs", w.State().NumExecs())) + sp.log.Debug("requests execution limit reached, worker will be restarted", "pid", w.Pid(), "execs", w.State().NumExecs()) w.State().Transition(fsm.StateMaxJobsReached) } @@ -222,7 +219,7 @@ begin: switch { case errors.Is(errors.ExecTTL, err): // in this case, the worker already killed in the ExecTTL function - sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) + sp.log.Warn("worker stopped, and will be restarted", "reason", "execTTL timeout elapsed", "pid", w.Pid(), "internal_event_name", events.EventExecTTL.String(), "error", err) w.State().Transition(fsm.StateExecTTLReached) // worker should already be reallocated @@ -232,14 +229,14 @@ begin: in case of soft job error, we should not kill the worker; this is just an error payload from the worker. */ w.State().Transition(fsm.StateReady) - sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err)) + sp.log.Warn("soft worker error", "reason", "SoftJob", "pid", w.Pid(), "internal_event_name", events.EventWorkerSoftError.String(), "error", err) sp.ww.Release(w) return nil, err case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Transition(fsm.StateErrored) - sp.log.Warn("RoadRunner can't communicate with the worker", zap.String("reason", "worker hung or process was killed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) + sp.log.Warn("RoadRunner can't communicate with the worker", "reason", "worker hung or process was killed", "pid", w.Pid(), "internal_event_name", events.EventWorkerError.String(), "error", err) // kill the worker instead of sending a net packet to it _ = w.Kill() @@ -252,7 +249,7 @@ begin: default: w.State().Transition(fsm.StateErrored) - sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) + sp.log.Warn("worker will be restarted", "pid", w.Pid(), "internal_event_name", events.EventWorkerDestruct.String(), "error", err) sp.ww.Release(w) return nil, err @@ -269,7 +266,7 @@ begin: switch { case rsp.Flags&frame.STREAM != 0: - sp.log.Debug("stream mode", zap.Int64("pid", w.Pid())) + sp.log.Debug("stream mode", "pid", w.Pid()) // create a channel for the stream (only if there are no errors) // we need to create a buffered channel to prevent blocking // stream buffer size should be bigger than regular, to have some payloads ready (optimization) @@ -281,7 +278,7 @@ begin: go func() { //nolint:gosec // G118 - intentional: per-iteration exec timeout must be independent of request context // would be called on Goexit defer func() { - sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) + sp.log.Debug("release [stream] worker", "pid", w.Pid(), "state", w.State().String()) close(resp) sp.ww.Release(w) }() @@ -291,17 +288,17 @@ begin: select { // we received a stop signal case <-stopCh: - sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) + sp.log.Debug("stream stop signal received", "pid", w.Pid(), "state", w.State().String()) ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) err = w.StreamCancel(ctxT) cancelT() if err != nil { w.State().Transition(fsm.StateErrored) - sp.log.Warn("stream cancel error", zap.Error(err)) + sp.log.Warn("stream cancel error", "error", err) } else { // successfully canceled w.State().Transition(fsm.StateReady) - sp.log.Debug("transition to the ready state", zap.String("from", w.State().String())) + sp.log.Debug("transition to the ready state", "from", w.State().String()) } runtime.Goexit() @@ -313,7 +310,7 @@ begin: pld, next, errI := w.StreamIterWithContext(ctxT) cancelT() if errI != nil { - sp.log.Warn("stream error", zap.Error(err)) + sp.log.Warn("stream error", "error", errI) resp <- newPExec(nil, errI) @@ -333,7 +330,7 @@ begin: // non supervised execution, can potentially hang here pld, next, errI := w.StreamIter() if errI != nil { - sp.log.Warn("stream iter error", zap.Error(err)) + sp.log.Warn("stream iter error", "error", errI) // send error response resp <- newPExec(nil, errI) @@ -359,7 +356,7 @@ begin: resp := make(chan *PExec, 1) // send the initial frame resp <- newPExec(rsp, nil) - sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid())) + sp.log.Debug("req-resp mode", "pid", w.Pid()) if w.State().Compare(fsm.StateWorking) { w.State().Transition(fsm.StateReady) } @@ -385,7 +382,7 @@ func (sp *Pool) NumDynamic() uint64 { // Destroy all underlying workers (but let them complete the task). func (sp *Pool) Destroy(ctx context.Context) { - sp.log.Info("destroy signal received", zap.Duration("timeout", sp.cfg.DestroyTimeout)) + sp.log.Info("destroy signal received", "timeout", sp.cfg.DestroyTimeout) var cancel context.CancelFunc _, ok := ctx.Deadline() if !ok { @@ -425,9 +422,9 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr if errors.Is(errors.NoFreeWorkers, err) { sp.log.Error( "no free workers in the pool, wait timeout exceed", - zap.String("reason", "no free workers"), - zap.String("internal_event_name", events.EventNoFreeWorkers.String()), - zap.Error(err), + "reason", "no free workers", + "internal_event_name", events.EventNoFreeWorkers.String(), + "error", err, ) // if we don't have a dynamic allocator or in debug mode, we can't allocate a new worker diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index 6142392..173c041 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -12,14 +12,15 @@ import ( "time" "unsafe" + "log/slog" + "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/ipc/pipe" - "github.com/roadrunner-server/pool/payload" - "github.com/roadrunner-server/pool/pool" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/ipc/pipe" + "github.com/roadrunner-server/pool/v2/payload" + "github.com/roadrunner-server/pool/v2/pool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) var testCfg = &pool.Config{ @@ -28,18 +29,13 @@ var testCfg = &pool.Config{ DestroyTimeout: time.Second * 500, } -var log = func() *zap.Logger { - logger, _ := zap.NewDevelopment() - return logger -} - func Test_NewPool(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -64,9 +60,9 @@ func Test_MaxWorkers(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), dynAllCfg, - log(), + slog.Default(), ) assert.Error(t, err) assert.Nil(t, p) @@ -82,9 +78,9 @@ func Test_NewPoolAddRemoveWorkers(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg2, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -117,7 +113,7 @@ func Test_StaticPool_NilFactory(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, nil, testCfg, - log(), + slog.Default(), ) assert.Error(t, err) assert.Nil(t, p) @@ -127,9 +123,9 @@ func Test_StaticPool_NilConfig(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), nil, - log(), + slog.Default(), ) assert.Error(t, err) assert.Nil(t, p) @@ -139,9 +135,9 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -163,9 +159,9 @@ func Test_StaticPool_RemoveWorker(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg2, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -206,9 +202,9 @@ func Test_Pool_Reallocate(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg2, - log(), + slog.Default(), ) require.NoError(t, err) require.NotNil(t, p) @@ -247,9 +243,9 @@ func Test_NewPoolReset(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg2, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -299,9 +295,9 @@ func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.Nil(t, p) @@ -312,12 +308,12 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NotNil(t, p) @@ -341,9 +337,9 @@ func Test_StaticPool_QueueSizeLimit(t *testing.T) { ctx, // sleep for 10 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg2, - log(), + slog.Default(), WithQueueSize(1), ) require.NoError(t, err) @@ -382,9 +378,9 @@ func Test_StaticPool_Echo(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -409,9 +405,9 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -437,9 +433,9 @@ func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -464,9 +460,9 @@ func Test_StaticPool_JobError(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -489,15 +485,12 @@ func Test_StaticPool_JobError(t *testing.T) { } func Test_StaticPool_Broken_Replace(t *testing.T) { - z, err := zap.NewProduction() - require.NoError(t, err) - p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - z, + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -522,7 +515,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfg2, nil, ) @@ -567,13 +560,13 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, }, - log(), + slog.Default(), ) assert.Error(t, err) if !errors.Is(errors.WorkerAllocate, err) { @@ -586,14 +579,14 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -634,13 +627,13 @@ func Test_StaticPool_DebugAddRemove(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -679,13 +672,13 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -729,13 +722,13 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NotNil(t, p) @@ -754,13 +747,13 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NotNil(t, p) @@ -788,13 +781,13 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -817,13 +810,13 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -836,7 +829,7 @@ func Test_StaticPool_ResetTimeout(t *testing.T) { t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: false, NumWorkers: 2, @@ -844,7 +837,7 @@ func Test_StaticPool_ResetTimeout(t *testing.T) { DestroyTimeout: time.Second * 100, ResetTimeout: time.Second * 3, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -866,7 +859,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: false, NumWorkers: 1, @@ -874,7 +867,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -897,13 +890,13 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -945,7 +938,7 @@ func Test_StaticPool_QueueSize(t *testing.T) { t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep_short.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: false, NumWorkers: 1, @@ -953,7 +946,7 @@ func Test_StaticPool_QueueSize(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -978,13 +971,13 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.Error(t, err) @@ -996,13 +989,13 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.Error(t, err) @@ -1013,9 +1006,9 @@ func Test_CRC_WithPayload(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/crc_error.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) assert.Error(t, err) data := err.Error() @@ -1047,9 +1040,9 @@ func Benchmark_Pool_Echo(b *testing.B) { p, err := NewPool( b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), testCfg, - log(), + slog.Default(), ) if err != nil { b.Fatal(err) @@ -1078,13 +1071,13 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, err := NewPool( b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: uint64(runtime.NumCPU()), //nolint:gosec AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(b, err) b.Cleanup(func() { @@ -1122,14 +1115,14 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, err := NewPool( b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, - log(), + slog.Default(), ) assert.NoError(b, err) b.Cleanup(func() { @@ -1182,3 +1175,173 @@ func BenchmarkToStringSafe(b *testing.B) { func toStringNotFun(data []byte) string { return string(data) } + +// ==================== Pool Lifecycle Edge Cases ==================== + +// TestPool_ExecAfterDestroy verifies that Exec after Destroy returns a clean error, not a panic. +// In production, HTTP handlers may hold pool references after config-reload triggers Destroy. +func TestPool_ExecAfterDestroy(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + + // Verify pool works before destroy + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + assert.Equal(t, []byte("hello"), resp.Body()) + + // Destroy the pool + p.Destroy(t.Context()) + + // Exec after Destroy — must return WatcherStopped error, not panic + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.Error(t, err) + assert.True(t, errors.Is(errors.WatcherStopped, err), "expected WatcherStopped, got: %v", err) +} + +// TestPool_MaxQueueSize_Zero_AllowsRequests verifies that QueueSize=0 means unlimited. +// The `maxQueueSize != 0` guard in Exec() is essential. +// Without it, 0 >= 0 would block ALL requests. +func TestPool_MaxQueueSize_Zero_AllowsRequests(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: uint64(runtime.NumCPU()), //nolint:gosec + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + }, + slog.Default(), + WithQueueSize(0), // explicitly zero + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // All 10 sequential requests should succeed + for range 10 { + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + assert.Equal(t, []byte("hello"), resp.Body()) + } +} + +// TestPool_Reset_WhileExecInProgress verifies Reset works while an Exec is in progress. +// This is the SIGHUP / config reload scenario. Reset must wait for in-flight work. +func TestPool_Reset_WhileExecInProgress(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep_short.php") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + ResetTimeout: time.Second * 10, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // Get initial worker PID + workers := p.Workers() + require.Len(t, workers, 1) + pidBefore := workers[0].Pid() + + // Start a slow exec in the background + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + }() + + // Give the exec time to start + time.Sleep(time.Millisecond * 500) + + // Reset while exec is in progress + err = p.Reset(t.Context()) + require.NoError(t, err) + + // Wait for in-flight exec to complete + <-done + + // Pool should work with a new worker + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + assert.NotNil(t, resp.Body()) + + // New worker should have different PID + workers2 := p.Workers() + require.Len(t, workers2, 1) + assert.NotEqual(t, pidBefore, workers2[0].Pid()) +} + +// TestPool_ConcurrentExec_MaxJobs1 verifies pool works correctly when workers are replaced after each job. +// MaxJobs=1 triggers replacement after every request. If the replacement races with next Exec, deadlock. +func TestPool_ConcurrentExec_MaxJobs1(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 2, + MaxJobs: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + time.Sleep(time.Second) + + // Track PIDs across sequential requests + seenPIDs := make(map[string]struct{}) + + for range 10 { + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + require.NotNil(t, resp.Body()) + seenPIDs[string(resp.Body())] = struct{}{} + } + + // With MaxJobs=1, workers get replaced — we should see multiple different PIDs + assert.Greater(t, len(seenPIDs), 1, "should see different PIDs due to MaxJobs=1 replacement") +} + +// TestPool_ExecEmptyPayload verifies that Exec with empty payload returns a clear error. +func TestPool_ExecEmptyPayload(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + testCfg, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + _, err = p.Exec(t.Context(), &payload.Payload{Body: nil, Context: nil}, make(chan struct{})) + require.Error(t, err) + assert.Contains(t, err.Error(), "payload can not be empty") +} diff --git a/pool/static_pool/stream.go b/pool/static_pool/stream.go index 3b58aae..82ec029 100644 --- a/pool/static_pool/stream.go +++ b/pool/static_pool/stream.go @@ -1,6 +1,6 @@ package static_pool -import "github.com/roadrunner-server/pool/payload" +import "github.com/roadrunner-server/pool/v2/payload" type PExec struct { pld *payload.Payload diff --git a/pool/static_pool/supervisor.go b/pool/static_pool/supervisor.go index 4a8d9d4..48eeca0 100644 --- a/pool/static_pool/supervisor.go +++ b/pool/static_pool/supervisor.go @@ -4,9 +4,8 @@ import ( "time" "github.com/roadrunner-server/events" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/state/process" - "go.uber.org/zap" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/state/process" ) const ( @@ -98,7 +97,7 @@ func (sp *Pool) control() { workers[i].State().Transition(fsm.StateInvalid) } - sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) + sp.log.Debug("ttl", "reason", "ttl is reached", "pid", workers[i].Pid(), "internal_event_name", events.EventTTL.String()) continue } @@ -120,7 +119,7 @@ func (sp *Pool) control() { workers[i].State().Transition(fsm.StateInvalid) } - sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String())) + sp.log.Debug("memory_limit", "reason", "max memory is reached", "pid", workers[i].Pid(), "internal_event_name", events.EventMaxMemory.String()) continue } @@ -170,7 +169,7 @@ func (sp *Pool) control() { */ workers[i].State().Transition(fsm.StateIdleTTLReached) - sp.log.Debug("idle_ttl", zap.String("reason", "idle ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) + sp.log.Debug("idle_ttl", "reason", "idle ttl is reached", "pid", workers[i].Pid(), "internal_event_name", events.EventTTL.String()) } } } diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index de77ef4..1428496 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -2,15 +2,16 @@ package static_pool import ( "context" + "log/slog" "os" "os/exec" "testing" "time" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/ipc/pipe" - "github.com/roadrunner-server/pool/payload" - "github.com/roadrunner-server/pool/pool" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/ipc/pipe" + "github.com/roadrunner-server/pool/v2/payload" + "github.com/roadrunner-server/pool/v2/pool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -32,9 +33,9 @@ func Test_SupervisedPool_Exec(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) require.NoError(t, err) @@ -62,9 +63,9 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) require.NoError(t, err) @@ -92,7 +93,7 @@ func Test_SupervisedPool_ImmediateDestroy(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, @@ -104,7 +105,7 @@ func Test_SupervisedPool_ImmediateDestroy(t *testing.T) { MaxWorkerMemory: 100, }, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -121,9 +122,9 @@ func Test_SupervisedPool_NilFactory(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), nil, - log(), + slog.Default(), ) assert.Error(t, err) assert.Nil(t, p) @@ -135,7 +136,7 @@ func Test_SupervisedPool_NilConfig(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, nil, cfgSupervised, - log(), + slog.Default(), ) assert.Error(t, err) assert.Nil(t, p) @@ -145,9 +146,9 @@ func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -168,9 +169,9 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -201,9 +202,9 @@ func Test_SupervisedPoolReset(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -230,7 +231,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: true, NumWorkers: uint64(1), @@ -244,7 +245,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { MaxWorkerMemory: 100, }, }, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -279,9 +280,9 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -312,9 +313,9 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -372,9 +373,9 @@ func TestSupervisedPool_Idle(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -422,9 +423,9 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -473,9 +474,9 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -520,9 +521,9 @@ func TestSupervisedPool_ShouldRespond(t *testing.T) { func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/should-not-be-killed.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -563,9 +564,9 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -590,9 +591,9 @@ func Test_SupervisedPool_FastCancel(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgSupervised, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -620,9 +621,9 @@ func Test_SupervisedPool_AllocateFailedOK(t *testing.T) { p, err := NewPool( t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), cfgExecTTL, - log(), + slog.Default(), ) assert.NoError(t, err) @@ -659,7 +660,7 @@ func Test_SupervisedPool_NoFreeWorkers(t *testing.T) { t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log()), + pipe.NewPipeFactory(slog.Default()), &pool.Config{ Debug: false, NumWorkers: 1, @@ -667,7 +668,7 @@ func Test_SupervisedPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: &pool.SupervisorConfig{}, }, - log(), + slog.Default(), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -685,3 +686,121 @@ func Test_SupervisedPool_NoFreeWorkers(t *testing.T) { time.Sleep(time.Second) t.Cleanup(func() { p.Destroy(t.Context()) }) } + +// ==================== Supervisor Edge Cases ==================== + +// TestSupervisor_TTL_WorkingWorker_GetsInvalid verifies that a working worker gets StateInvalid +// (not StateTTLReached) when TTL expires during request execution. +// The TTL branch in control(): if worker is Ready → StateTTLReached, else → StateInvalid (graceful). +func TestSupervisor_TTL_WorkingWorker_GetsInvalid(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &pool.SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 2 * time.Second, + }, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // Start a long exec (sleep.php sleeps for 300s) + go func() { + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: []byte("")}, make(chan struct{})) + }() + + // Wait for TTL to be reached while worker is busy + time.Sleep(time.Second * 4) + + // Worker should be marked as Invalid (not TTLReached) + workers := p.Workers() + require.NotEmpty(t, workers) + state := workers[0].State().CurrentState() + // Worker should either be Invalid (marked by supervisor) or already replaced + // If replaced, it should be in Ready state with a different PID + assert.True(t, + state == fsm.StateInvalid || state == fsm.StateReady || state == fsm.StateWorking, + "expected Invalid/Ready/Working, got %d", state) +} + +// TestSupervisor_IdleTTL_SkipsNeverUsedWorker verifies that workers with LastUsed==0 +// (never executed a request) are not killed by idle TTL check. +// On pool startup, all workers have LastUsed=0 — idle check must skip them. +func TestSupervisor_IdleTTL_SkipsNeverUsedWorker(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + Supervisor: &pool.SupervisorConfig{ + WatchTick: 1 * time.Second, + IdleTTL: 1 * time.Second, + }, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + pid := p.Workers()[0].Pid() + + // Wait 3 seconds WITHOUT executing any request + // The idle TTL check should skip this worker because LastUsed==0 + time.Sleep(time.Second * 3) + + workers := p.Workers() + require.Len(t, workers, 1) + // Worker should still be alive with the same PID (not replaced) + assert.Equal(t, pid, workers[0].Pid(), "never-used worker should not be killed by idle TTL") +} + +// TestSupervisor_MemoryCheck_WorkingWorkerGetsInvalid verifies that a working worker +// exceeding memory gets StateInvalid (not StateMaxMemoryReached). +// Same pattern as TTL in control(): killing a working worker corrupts in-flight response. +func TestSupervisor_MemoryCheck_WorkingWorkerGetsInvalid(t *testing.T) { + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php") }, + pipe.NewPipeFactory(slog.Default()), + &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &pool.SupervisorConfig{ + WatchTick: 1 * time.Second, + MaxWorkerMemory: 1, // 1 MB — very low, will be exceeded + }, + }, + slog.Default(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + + // Start a long exec while worker is busy + go func() { + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: []byte("")}, make(chan struct{})) + }() + + time.Sleep(time.Second * 3) + + // Worker should be marked Invalid while working (not MaxMemoryReached) + workers := p.Workers() + require.NotEmpty(t, workers) + state := workers[0].State().CurrentState() + assert.True(t, + state == fsm.StateInvalid || state == fsm.StateReady || state == fsm.StateWorking, + "expected Invalid/Ready/Working, got %d", state) +} diff --git a/state/process/state.go b/state/process/state.go index 47ac77b..292a5e1 100644 --- a/state/process/state.go +++ b/state/process/state.go @@ -2,7 +2,7 @@ package process import ( "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/worker" + "github.com/roadrunner-server/pool/v2/worker" "github.com/shirou/gopsutil/process" ) diff --git a/worker/options.go b/worker/options.go index e8e5330..6b74825 100644 --- a/worker/options.go +++ b/worker/options.go @@ -2,9 +2,8 @@ package worker import ( "crypto/rand" + "log/slog" "math/big" - - "go.uber.org/zap" ) const ( @@ -13,7 +12,7 @@ const ( type Options func(p *Process) -func WithLog(z *zap.Logger) Options { +func WithLog(z *slog.Logger) Options { return func(p *Process) { p.log = z } @@ -25,7 +24,7 @@ func WithMaxExecs(maxExecs uint64) Options { } } -func calculateMaxExecsJitter(maxExecs, jitter uint64, log *zap.Logger) uint64 { +func calculateMaxExecsJitter(maxExecs, jitter uint64, log *slog.Logger) uint64 { if maxExecs == 0 { return 0 } @@ -33,7 +32,9 @@ func calculateMaxExecsJitter(maxExecs, jitter uint64, log *zap.Logger) uint64 { random, err := rand.Int(rand.Reader, big.NewInt(int64(jitter))) //nolint:gosec if err != nil { - log.Debug("jitter calculation error", zap.Error(err), zap.Uint64("jitter", jitter)) + if log != nil { + log.Debug("jitter calculation error", "error", err, "jitter", jitter) + } return maxExecs } diff --git a/worker/worker.go b/worker/worker.go index a1916eb..6eaf2d5 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -6,6 +6,7 @@ import ( stderr "errors" "fmt" "io" + "log/slog" "os" "os/exec" "runtime" @@ -15,19 +16,18 @@ import ( "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/goridge/v3/pkg/frame" - "github.com/roadrunner-server/goridge/v3/pkg/relay" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/internal" - "github.com/roadrunner-server/pool/payload" - "go.uber.org/zap" + "github.com/roadrunner-server/goridge/v4/pkg/frame" + "github.com/roadrunner-server/goridge/v4/pkg/relay" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/internal" + "github.com/roadrunner-server/pool/v2/payload" ) // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time - log *zap.Logger + log *slog.Logger // calculated maximum value with jitter maxExecs uint64 @@ -98,12 +98,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { } if w.log == nil { - z, err := zap.NewProduction() - if err != nil { - return nil, err - } - - w.log = z + w.log = slog.Default() } w.fsm = fsm.NewFSM(fsm.StateInactive, w.log) @@ -120,7 +115,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { buf := make([]byte, 65536) errCopy := copyBuffer(w, rc, buf) if errCopy != nil { - w.log.Debug("stderr", zap.Error(errCopy)) + w.log.Debug("stderr", "error", errCopy) } }() @@ -255,7 +250,7 @@ func (w *Process) StreamIterWithContext(ctx context.Context) (*payload.Payload, err: err, } - w.log.Debug("stream iter error", zap.Int64("pid", w.Pid()), zap.Error(err)) + w.log.Debug("stream iter error", "pid", w.Pid(), "error", err) // trash response rsp = nil runtime.Goexit() @@ -304,7 +299,7 @@ func (w *Process) StreamCancel(ctx context.Context) error { return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) } - w.log.Debug("stream was canceled, sending stop bit", zap.Int64("pid", w.Pid())) + w.log.Debug("stream was canceled, sending stop bit", "pid", w.Pid()) // get a frame fr := w.getFrame() @@ -323,7 +318,7 @@ func (w *Process) StreamCancel(ctx context.Context) error { w.putFrame(fr) c := w.getCh() - w.log.Debug("stop bit was sent, waiting for the response", zap.Int64("pid", w.Pid())) + w.log.Debug("stop bit was sent, waiting for the response", "pid", w.Pid()) go func() { for { @@ -333,7 +328,7 @@ func (w *Process) StreamCancel(ctx context.Context) error { err: errrf, } - w.log.Debug("stream cancel error", zap.Int64("pid", w.Pid()), zap.Error(errrf)) + w.log.Debug("stream cancel error", "pid", w.Pid(), "error", errrf) // trash response rsp = nil runtime.Goexit() @@ -341,7 +336,7 @@ func (w *Process) StreamCancel(ctx context.Context) error { // stream has ended if rsp.Flags&frame.STREAM == 0 { - w.log.Debug("stream has ended", zap.Int64("pid", w.Pid())) + w.log.Debug("stream has ended", "pid", w.Pid()) c <- &wexec{} // trash response rsp = nil @@ -435,7 +430,7 @@ func (w *Process) Stop() error { w.fsm.Transition(fsm.StateStopping) go func() { - w.log.Debug("sending stop request to the worker", zap.Int("pid", w.pid)) + w.log.Debug("sending stop request to the worker", "pid", w.pid) err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) if err == nil { w.fsm.Transition(fsm.StateStopped) @@ -447,11 +442,11 @@ func (w *Process) Stop() error { // If we successfully sent a stop request, Wait() method will send a struct{} to the doneCh and we're done here // otherwise we have 10 seconds before we kill the process case <-w.doneCh: - w.log.Debug("worker stopped", zap.Int("pid", w.pid)) + w.log.Debug("worker stopped", "pid", w.pid) return nil case <-time.After(time.Second * 10): // kill process - w.log.Warn("worker doesn't respond on stop command, killing process", zap.Int64("PID", w.Pid())) + w.log.Warn("worker doesn't respond on stop command, killing process", "pid", w.Pid()) _ = w.cmd.Process.Signal(os.Kill) w.fsm.Transition(fsm.StateStopped) return errors.E(op, errors.Network) diff --git a/worker/worker_test.go b/worker/worker_test.go index d7da6bc..4d7be03 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -4,7 +4,7 @@ import ( "os/exec" "testing" - "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/v2/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 29cd707..365dd4b 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -7,8 +7,8 @@ import ( "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/worker" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/worker" ) type Vec struct { @@ -59,6 +59,7 @@ func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { for v.reset.Load() { select { case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) default: time.Sleep(time.Millisecond * 10) } diff --git a/worker_watcher/container/channel/vec_test.go b/worker_watcher/container/channel/vec_test.go index 62045e6..bde4d5b 100644 --- a/worker_watcher/container/channel/vec_test.go +++ b/worker_watcher/container/channel/vec_test.go @@ -3,11 +3,13 @@ package channel import ( "context" "os/exec" + "sync" "testing" "time" "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/worker" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -21,6 +23,41 @@ func createTestWorker(t *testing.T) *worker.Process { return w } +// createStartedTestWorker creates a worker with a running process (so Kill() won't panic on nil Process) +func createStartedTestWorker(t *testing.T) *worker.Process { + t.Helper() + cmd := exec.Command("sleep", "100") + w, err := worker.InitBaseWorker(cmd) + require.NoError(t, err) + require.NoError(t, cmd.Start()) + t.Cleanup(func() { _ = cmd.Process.Kill(); _ = cmd.Wait() }) + return w +} + +// createTestWorkerInState creates a started worker in a specific FSM state. +// Workers must be started so that Kill() doesn't panic on nil Process. +func createTestWorkerInState(t *testing.T, state int64) *worker.Process { + t.Helper() + w := createStartedTestWorker(t) + // Workers start in StateInactive; transition through valid paths + switch state { + case fsm.StateReady: + w.State().Transition(fsm.StateReady) + case fsm.StateWorking: + w.State().Transition(fsm.StateReady) + w.State().Transition(fsm.StateWorking) + case fsm.StateErrored: + w.State().Transition(fsm.StateErrored) + case fsm.StateTTLReached: + w.State().Transition(fsm.StateReady) + w.State().Transition(fsm.StateTTLReached) + case fsm.StateInvalid: + w.State().Transition(fsm.StateReady) + w.State().Transition(fsm.StateInvalid) + } + return w +} + // ==================== General Tests ==================== // Test_Vec_PushPop tests basic push and pop operations @@ -120,3 +157,171 @@ func Test_Vec_PopContextTimeout(t *testing.T) { // Verify it actually waited for the timeout assert.True(t, elapsed >= 50*time.Millisecond, "should have waited for timeout") } + +// ==================== Additional Tests ==================== + +// Test_Vec_PopDuringReset_BlocksThenUnblocks verifies Pop blocks during Reset and unblocks after ResetDone. +// If Pop succeeds during reset, a worker can be popped and simultaneously killed by the reset path. +func Test_Vec_PopDuringReset_BlocksThenUnblocks(t *testing.T) { + vec := NewVector() + for range 3 { + vec.Push(createTestWorker(t)) + } + + // Signal reset — Pop should now spin-wait + vec.Reset() + + type popResult struct { + w *worker.Process + err error + } + result := make(chan popResult, 1) + + // Start Pop in a goroutine — it should block in the reset spin-loop + go func() { + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + w, err := vec.Pop(ctx) + result <- popResult{w, err} + }() + + // Verify Pop is blocked (hasn't returned after 300ms) + select { + case <-result: + t.Fatal("Pop should be blocked during reset") + case <-time.After(300 * time.Millisecond): + // expected — Pop is blocked in the reset spin-loop + } + + // Complete reset and push fresh workers + vec.ResetDone() + for range 3 { + vec.Push(createTestWorker(t)) + } + + // Pop should now unblock and succeed + select { + case r := <-result: + require.NoError(t, r.err) + assert.NotNil(t, r.w) + case <-time.After(5 * time.Second): + t.Fatal("Pop should have unblocked after ResetDone") + } +} + +// Test_Vec_PopDuringReset_ContextCancel verifies Pop returns NoFreeWorkers when context +// expires during the reset spin-loop (instead of hanging forever). +func Test_Vec_PopDuringReset_ContextCancel(t *testing.T) { + vec := NewVector() + for range 3 { + vec.Push(createTestWorker(t)) + } + + // Signal reset — Pop should now spin-wait + vec.Reset() + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + defer cancel() + + start := time.Now() + w, err := vec.Pop(ctx) + elapsed := time.Since(start) + + assert.Nil(t, w) + require.Error(t, err) + assert.True(t, errors.Is(errors.NoFreeWorkers, err)) + assert.Less(t, elapsed, 2*time.Second, "should exit promptly when context expires") + + // Cleanup: complete the reset so vec isn't stuck + vec.ResetDone() +} + +// Test_Vec_PushToFullChannel verifies that when the channel is full, overflow workers are killed. +// Production safety valve — without it, Push goroutine hangs forever on a full channel. +func Test_Vec_PushToFullChannel(t *testing.T) { + vec := NewVector() + + // Fill channel to capacity (2048) + for range 2048 { + vec.Push(createTestWorker(t)) + } + assert.Equal(t, 2048, vec.Len()) + + // Push one more — should hit the default branch and kill the worker (not block). + // Must be a started worker since Push calls Kill() on overflow. + done := make(chan struct{}) + go func() { + vec.Push(createStartedTestWorker(t)) + close(done) + }() + + select { + case <-done: + // Push returned — overflow worker was killed, channel still at capacity + assert.Equal(t, 2048, vec.Len()) + case <-time.After(2 * time.Second): + t.Fatal("Push blocked on full channel — safety valve broken") + } +} + +// Test_Vec_Remove_DrainsBadWorkers verifies that Remove drains workers in bad states. +// Remove is called from the worker death callback. If it fails, the pool slowly loses capacity. +func Test_Vec_Remove_DrainsBadWorkers(t *testing.T) { + vec := NewVector() + + // Push 5 workers: 3 Ready, 2 Errored + for range 3 { + w := createTestWorkerInState(t, fsm.StateReady) + vec.Push(w) + } + for range 2 { + w := createTestWorkerInState(t, fsm.StateErrored) + vec.Push(w) + } + assert.Equal(t, 5, vec.Len()) + + // Remove should drain bad workers + vec.Remove() + + // Only Ready workers should remain + assert.Equal(t, 3, vec.Len()) +} + +// Test_Vec_ConcurrentPushPop verifies no panics or deadlocks under concurrent Push/Pop. +// In production, Exec releases workers (Push) while new Exec calls take workers (Pop) continuously. +func Test_Vec_ConcurrentPushPop(t *testing.T) { + vec := NewVector() + const producers = 5 + const consumers = 5 + const opsPerGoroutine = 50 + + // Seed with some workers so consumers don't starve + for range 20 { + vec.Push(createTestWorker(t)) + } + + var wg sync.WaitGroup + + // Producer goroutines + for range producers { + wg.Go(func() { + for range opsPerGoroutine { + vec.Push(createTestWorker(t)) + } + }) + } + + // Consumer goroutines + for range consumers { + wg.Go(func() { + for range opsPerGoroutine { + popCtx, popCancel := context.WithTimeout(t.Context(), 100*time.Millisecond) + _, _ = vec.Pop(popCtx) + popCancel() + } + }) + } + + wg.Wait() + assert.GreaterOrEqual(t, vec.Len(), 0) +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index cd0b2ff..85b93d4 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -7,12 +7,13 @@ import ( "sync/atomic" "time" + "log/slog" + "github.com/roadrunner-server/errors" "github.com/roadrunner-server/events" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/worker" - "github.com/roadrunner-server/pool/worker_watcher/container/channel" - "go.uber.org/zap" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/worker" + "github.com/roadrunner-server/pool/v2/worker_watcher/container/channel" ) const maxWorkers = 2048 @@ -32,7 +33,7 @@ type WorkerWatcher struct { workers sync.Map // workers map[int64]*worker.Process - log *zap.Logger + log *slog.Logger allocator Allocator allocateTimeout time.Duration @@ -40,7 +41,7 @@ type WorkerWatcher struct { } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { +func NewSyncWorkerWatcher(allocator Allocator, log *slog.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { eb, _ := events.NewEventBus() ww := &WorkerWatcher{ container: channel.NewVector(), @@ -171,7 +172,7 @@ func (ww *WorkerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.log.Error("allocate", zap.Error(err)) + ww.log.Error("allocate", "error", err) // if no timeout, return the error immediately if ww.allocateTimeout == 0 { return errors.E(op, errors.WorkerAllocate, err) @@ -192,7 +193,7 @@ func (ww *WorkerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.log.Error("allocate retry attempt failed", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) + ww.log.Error("allocate retry attempt failed", "internal_event_name", events.EventWorkerError.String(), "error", err) continue } @@ -212,7 +213,7 @@ done: ww.addToWatch(sw) // add a new worker to the worker's slice (to get information about workers in parallel) if w, ok := ww.workers.LoadAndDelete(sw.Pid()); ok { - ww.log.Warn("allocated worker already exists, killing duplicate, report this case", zap.Int64("pid", sw.Pid())) + ww.log.Warn("allocated worker already exists, killing duplicate, report this case", "pid", sw.Pid()) _ = w.(*worker.Process).Kill() } @@ -243,7 +244,7 @@ func (ww *WorkerWatcher) Release(w *worker.Process) { err := w.Stop() if err != nil { - ww.log.Debug("worker release", zap.Error(err)) + ww.log.Debug("worker release", "error", err) } default: // in all other cases, we have no choice rather than kill the worker @@ -376,7 +377,7 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { return case <-ctx.Done(): // kill workers - ww.log.Debug("destroy: context canceled", zap.Error(ctx.Err())) + ww.log.Debug("destroy: context canceled", "error", ctx.Err()) ww.mu.Lock() wg := &sync.WaitGroup{} ww.workers.Range(func(key, value any) bool { @@ -419,7 +420,7 @@ func (ww *WorkerWatcher) List() []*worker.Process { func (ww *WorkerWatcher) wait(w *worker.Process) { err := w.Wait() if err != nil { - ww.log.Debug("worker stopped", zap.String("internal_event_name", events.EventWorkerWaitExit.String()), zap.Error(err)) + ww.log.Debug("worker stopped", "internal_event_name", events.EventWorkerWaitExit.String(), "error", err) } // remove worker @@ -427,7 +428,11 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { if w.State().Compare(fsm.StateDestroyed) { // worker was manually destroyed, no need to replace - ww.log.Debug("worker destroyed", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) + if err != nil { + ww.log.Debug("worker destroyed", "pid", w.Pid(), "internal_event_name", events.EventWorkerDestruct.String(), "error", err) + } else { + ww.log.Debug("worker destroyed", "pid", w.Pid(), "internal_event_name", events.EventWorkerDestruct.String()) + } return } @@ -439,7 +444,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { } ww.numWorkers.Add(^uint64(0)) // dead worker was not replaced - ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) + ww.log.Error("failed to allocate the worker", "internal_event_name", events.EventWorkerError.String(), "error", err) if ww.numWorkers.Load() == 0 { panic("no workers available, can't run the application") } diff --git a/worker_watcher/worker_watcher_test.go b/worker_watcher/worker_watcher_test.go new file mode 100644 index 0000000..c63e984 --- /dev/null +++ b/worker_watcher/worker_watcher_test.go @@ -0,0 +1,282 @@ +package worker_watcher + +import ( + "context" + "fmt" + "log/slog" + "os/exec" + "sync/atomic" + "testing" + "time" + + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/pool/v2/fsm" + "github.com/roadrunner-server/pool/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// alwaysFailAllocator returns an allocator that always fails. +func alwaysFailAllocator() Allocator { + return func() (*worker.Process, error) { + return nil, fmt.Errorf("permanent allocation failure") + } +} + +// fakeAllocator returns an allocator that fails failCount times then succeeds. +// Counter semantics: Store(2) means "fail twice" because Add(-1) returns the new value, +// so call 1 returns 1 (>=0, fail), call 2 returns 0 (>=0, fail), call 3 returns -1 (<0, succeed). +// Workers are created with a running "sleep" process so Kill won't panic. +// Note: workers won't have a relay, so Stop() will panic — tests must avoid Stop paths. +func fakeAllocator(t *testing.T, failCount *atomic.Int32) Allocator { + t.Helper() + return func() (*worker.Process, error) { + if failCount.Add(-1) >= 0 { + return nil, fmt.Errorf("simulated allocation failure") + } + cmd := exec.Command("sleep", "100") + w, err := worker.InitBaseWorker(cmd) + if err != nil { + return nil, err + } + if err = cmd.Start(); err != nil { + return nil, err + } + t.Cleanup(func() { + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + }) + w.State().Transition(fsm.StateReady) + return w, nil + } +} + +// createStartedWorker creates a worker with a running process in the specified state. +// Note: no relay attached — Kill() works but Stop() will panic. +func createStartedWorker(t *testing.T, state int64) *worker.Process { + t.Helper() + cmd := exec.Command("sleep", "100") + w, err := worker.InitBaseWorker(cmd) + require.NoError(t, err) + require.NoError(t, cmd.Start()) + t.Cleanup(func() { _ = cmd.Process.Kill(); _ = cmd.Wait() }) + + switch state { + case fsm.StateReady: + w.State().Transition(fsm.StateReady) + case fsm.StateWorking: + w.State().Transition(fsm.StateReady) + w.State().Transition(fsm.StateWorking) + case fsm.StateErrored: + w.State().Transition(fsm.StateErrored) + case fsm.StateTTLReached: + w.State().Transition(fsm.StateReady) + w.State().Transition(fsm.StateTTLReached) + } + return w +} + +// shutdownWatcher signals the watcher to stop (without calling Destroy which needs relay). +func shutdownWatcher(ww *WorkerWatcher) { + ww.container.Destroy() + select { + case ww.stopCh <- struct{}{}: + default: + } +} + +// TestWorkerWatcher_AllocateRetryTimeout verifies that Allocate returns a WorkerAllocate error +// after the allocateTimeout when allocation always fails. +// If the retry loop hangs, the ww state is corrupted and the pool deadlocks. +func TestWorkerWatcher_AllocateRetryTimeout(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 3*time.Second) + t.Cleanup(func() { shutdownWatcher(ww) }) + + start := time.Now() + err := ww.Allocate() + elapsed := time.Since(start) + + require.Error(t, err) + assert.True(t, errors.Is(errors.WorkerAllocate, err)) + assert.GreaterOrEqual(t, elapsed, 3*time.Second, "should have retried for ~3s") + assert.Less(t, elapsed, 8*time.Second, "should not exceed timeout significantly") +} + +// TestWorkerWatcher_AllocateNoTimeout verifies that Allocate returns immediately +// when allocateTimeout is 0 (no retry). +func TestWorkerWatcher_AllocateNoTimeout(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + start := time.Now() + err := ww.Allocate() + elapsed := time.Since(start) + + require.Error(t, err) + assert.True(t, errors.Is(errors.WorkerAllocate, err)) + assert.Less(t, elapsed, time.Second, "should return immediately with no retry timeout") +} + +// TestWorkerWatcher_AllocateRetrySuccess verifies that Allocate retries and eventually succeeds. +// Validates the goto done path — ticker is stopped, worker is added to watch and container. +func TestWorkerWatcher_AllocateRetrySuccess(t *testing.T) { + log := slog.Default() + var failCount atomic.Int32 + failCount.Store(2) // fail twice, succeed on third + + ww := NewSyncWorkerWatcher(fakeAllocator(t, &failCount), log, 1, 10*time.Second) + t.Cleanup(func() { shutdownWatcher(ww) }) + + err := ww.Allocate() + require.NoError(t, err) + + // Worker should now be in the watcher's list + workers := ww.List() + assert.Len(t, workers, 1) + assert.Equal(t, fsm.StateReady, workers[0].State().CurrentState()) +} + +// TestWorkerWatcher_RemoveWorker_CannotRemoveLast verifies the last worker cannot be removed. +// The numWorkers==1 guard in RemoveWorker() prevents removing the last worker. If broken, pool panics on "no workers available". +func TestWorkerWatcher_RemoveWorker_CannotRemoveLast(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 5*time.Second) + t.Cleanup(func() { shutdownWatcher(ww) }) + + // numWorkers is 1 (set in constructor). RemoveWorker should refuse. + assert.Equal(t, uint64(1), ww.numWorkers.Load()) + + // RemoveWorker internally calls Take which needs workers in the container, + // but the numWorkers==1 guard in RemoveWorker() checks first and returns nil. + err := ww.RemoveWorker(t.Context()) + require.NoError(t, err) + assert.Equal(t, uint64(1), ww.numWorkers.Load(), "should not decrement the last worker") +} + +// TestWorkerWatcher_Take_FastPath_ReadyWorker verifies Take returns immediately for Ready workers. +func TestWorkerWatcher_Take_FastPath_ReadyWorker(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + w := createStartedWorker(t, fsm.StateReady) + ww.container.Push(w) + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + + got, err := ww.Take(ctx) + require.NoError(t, err) + assert.Equal(t, w.Pid(), got.Pid()) + assert.Equal(t, fsm.StateReady, got.State().CurrentState()) +} + +// TestWorkerWatcher_Take_SlowPath_SkipsBadWorkers verifies Take skips non-Ready workers +// and kills them. The first non-Ready worker triggers the slow path via Kill() (works without relay). +// Subsequent bad workers are killed via Stop() — we avoid that by only having one bad + one good. +func TestWorkerWatcher_Take_SlowPath_SkipsBadWorkers(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 2, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + // First worker: TTL'd (not Ready) — triggers slow path, gets killed via Kill() + w1 := createStartedWorker(t, fsm.StateTTLReached) + // Second worker: Ready — should be returned + w2 := createStartedWorker(t, fsm.StateReady) + + ww.container.Push(w1) + ww.container.Push(w2) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + got, err := ww.Take(ctx) + require.NoError(t, err) + assert.Equal(t, w2.Pid(), got.Pid()) + assert.Equal(t, fsm.StateReady, got.State().CurrentState()) +} + +// TestWorkerWatcher_Release_ReadyState_PushesToContainer verifies releasing a Ready worker +// pushes it back to the container for reuse. +func TestWorkerWatcher_Release_ReadyState_PushesToContainer(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + w := createStartedWorker(t, fsm.StateReady) + initialLen := ww.container.Len() + + ww.Release(w) + + assert.Equal(t, initialLen+1, ww.container.Len(), "ready worker should be pushed to container") +} + +// TestWorkerWatcher_Take_AfterDestroy verifies Take returns WatcherStopped after container is destroyed. +func TestWorkerWatcher_Take_AfterDestroy(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 0) + + // Push a worker and destroy the container + w := createStartedWorker(t, fsm.StateReady) + ww.container.Push(w) + ww.container.Destroy() + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + + _, err := ww.Take(ctx) + require.Error(t, err) + assert.True(t, errors.Is(errors.WatcherStopped, err)) +} + +// TestWorkerWatcher_List_ReturnsAllWorkers verifies List returns a copy of all tracked workers. +func TestWorkerWatcher_List_ReturnsAllWorkers(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 3, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + // Store 3 workers directly in the workers map with unique keys. + // We use synthetic keys because our fake workers all have pid=0 + // (they bypass w.Start() which sets the pid field). + for i := range 3 { + w := createStartedWorker(t, fsm.StateReady) + ww.workers.Store(int64(i+1), w) + } + + workers := ww.List() + assert.Len(t, workers, 3) +} + +// TestWorkerWatcher_List_EmptyWatcher verifies List returns nil when numWorkers is 0. +func TestWorkerWatcher_List_EmptyWatcher(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 0, 0) + t.Cleanup(func() { shutdownWatcher(ww) }) + + workers := ww.List() + assert.Nil(t, workers) +} + +// TestWorkerWatcher_Allocate_StopChExitsRetryLoop verifies that sending on stopCh +// causes the Allocate retry loop to exit promptly with WatcherStopped. +func TestWorkerWatcher_Allocate_StopChExitsRetryLoop(t *testing.T) { + log := slog.Default() + ww := NewSyncWorkerWatcher(alwaysFailAllocator(), log, 1, 10*time.Second) + + // Send on stopCh after 1 second (in the background) + go func() { + time.Sleep(time.Second) + ww.stopCh <- struct{}{} + }() + + start := time.Now() + err := ww.Allocate() + elapsed := time.Since(start) + + require.Error(t, err) + assert.True(t, errors.Is(errors.WatcherStopped, err)) + assert.Less(t, elapsed, 3*time.Second, "should exit promptly via stopCh") +}