diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ee4c571..56f3381 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,7 @@ jobs: matrix: php: ["8.4"] go: [stable] - os: ["ubuntu-latest", "macos-latest", "windows-latest"] + os: ["ubuntu-latest"] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v6 # action page: @@ -42,14 +42,8 @@ jobs: run: go mod download - name: Run golang tests with coverage on Linux - if: runner.os == 'Linux' run: make test_coverage - - name: Run golang tests with coverage on Windows - if: runner.os == 'Windows' - run: make test_coverage - shell: bash - - uses: codecov/codecov-action@v5 # Docs: with: files: ./coverage-ci/summary.txt diff --git a/Makefile b/Makefile index b15d3d1..f9720c2 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,17 @@ #!/usr/bin/make -# Makefile readme (ru): -# Makefile readme (en): SHELL = /bin/sh test_coverage: rm -rf coverage-ci mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/channel.out -covermode=atomic ./worker_watcher/container/channel + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/ratelimiter.out -covermode=atomic ./pool/ratelimiter echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt @@ -21,4 +21,6 @@ test: ## Run application tests go test -v -race ./pool/static_pool go test -v -race ./worker go test -v -race ./worker_watcher + go test -v -race ./worker_watcher/container/channel + go test -v -race ./pool/ratelimiter go test -v -race -fuzz=FuzzStaticPoolEcho -fuzztime=30s -tags=debug ./pool/static_pool diff --git a/go.mod b/go.mod index d3651b5..5bbda70 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/roadrunner-server/pool -go 1.24.0 +go 1.25 require ( github.com/roadrunner-server/errors v1.4.1 @@ -19,11 +19,11 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect - github.com/tklauser/go-sysconf v0.3.15 // indirect - github.com/tklauser/numcpus v0.10.0 // indirect + github.com/tklauser/go-sysconf v0.3.16 // indirect + github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.35.0 // indirect + golang.org/x/sys v0.39.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8ae63b0..6a1b1a9 100644 --- a/go.sum +++ b/go.sum @@ -30,14 +30,20 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= +github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= +github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= +github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= +github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= @@ -46,6 +52,8 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/pool/config.go b/pool/config.go index 46f4d85..bdcb536 100644 --- a/pool/config.go +++ b/pool/config.go @@ -5,6 +5,8 @@ import ( "time" ) +const maxWorkers = 2048 + // Config .. Pool config Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. @@ -64,7 +66,7 @@ func (cfg *Config) InitDefaults() { // initialize the dynamic allocator if cfg.DynamicAllocatorOpts != nil { - cfg.DynamicAllocatorOpts.InitDefaults() + cfg.DynamicAllocatorOpts.InitDefaults(cfg.NumWorkers) } } @@ -94,14 +96,14 @@ type DynamicAllocationOpts struct { IdleTimeout time.Duration `mapstructure:"idle_timeout"` } -func (d *DynamicAllocationOpts) InitDefaults() { +func (d *DynamicAllocationOpts) InitDefaults(numWorkers uint64) { if d.MaxWorkers == 0 { d.MaxWorkers = 10 } - // limit max workers to 100 - if d.MaxWorkers > 100 { - d.MaxWorkers = 100 + // limit max workers to 1000 dynamically allocated workers + if d.MaxWorkers+numWorkers > maxWorkers { + d.MaxWorkers = maxWorkers - numWorkers } if d.SpawnRate == 0 { diff --git a/pool/ratelimiter/ratelimiter.go b/pool/ratelimiter/ratelimiter.go new file mode 100644 index 0000000..678bd12 --- /dev/null +++ b/pool/ratelimiter/ratelimiter.go @@ -0,0 +1,40 @@ +package ratelimiter + +import ( + "sync" + "time" +) + +type RateLimiter struct { + mu sync.Mutex + available bool + cooldown time.Duration +} + +func NewRateLimiter(cooldown time.Duration) *RateLimiter { + return &RateLimiter{ + available: true, + cooldown: cooldown, + } +} + +// TryAcquire attempts to take the token. Returns false immediately if unavailable. +func (rl *RateLimiter) TryAcquire() bool { + rl.mu.Lock() + defer rl.mu.Unlock() + + if rl.available { + rl.available = false + return true + } + return false +} + +// Release returns the token after the cooldown period. +func (rl *RateLimiter) Release() { + time.AfterFunc(rl.cooldown, func() { + rl.mu.Lock() + rl.available = true + rl.mu.Unlock() + }) +} diff --git a/pool/ratelimiter/ratelimiter_test.go b/pool/ratelimiter/ratelimiter_test.go new file mode 100644 index 0000000..ab919e8 --- /dev/null +++ b/pool/ratelimiter/ratelimiter_test.go @@ -0,0 +1,126 @@ +package ratelimiter + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRateLimiter_BasicAcquireRelease(t *testing.T) { + rl := NewRateLimiter(1 * time.Second) + + // First acquire should succeed + if !rl.TryAcquire() { + t.Error("expected first TryAcquire to succeed") + } + + // Release and wait for cooldown + rl.Release() + time.Sleep(2 * time.Second) + + // Should be available again + if !rl.TryAcquire() { + t.Error("expected TryAcquire to succeed after cooldown") + } +} + +func TestRateLimiter_CooldownRestoresAvailability(t *testing.T) { + cooldown := 1 * time.Second + rl := NewRateLimiter(cooldown) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Release it + rl.Release() + + // Should not be available immediately + if rl.TryAcquire() { + t.Error("token should not be available immediately after release") + } + + // Wait for cooldown to complete + time.Sleep(cooldown + 1*time.Second) + + // Now it should be available + if !rl.TryAcquire() { + t.Error("token should be available after cooldown period") + } +} + +func TestRateLimiter_ZeroCooldown(t *testing.T) { + rl := NewRateLimiter(0) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Release with zero cooldown + rl.Release() + + // With zero cooldown, AfterFunc fires immediately (or very soon) + time.Sleep(1 * time.Second) + + // Should be available again almost immediately + if !rl.TryAcquire() { + t.Error("token should be available after zero cooldown") + } +} + +func TestRateLimiter_MultipleAcquireWhenUnavailable(t *testing.T) { + rl := NewRateLimiter(5 * time.Second) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Multiple attempts should all fail + for i := range 10 { + if rl.TryAcquire() { + t.Errorf("TryAcquire attempt %d should have failed", i) + } + } +} + +func TestRateLimiter_RaceCondition(t *testing.T) { + rl := NewRateLimiter(1 * time.Second) + + const numGoroutines = 100 + const iterations = 10 + + var successCount atomic.Int64 + var wg sync.WaitGroup + + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + for range iterations { + if rl.TryAcquire() { + successCount.Add(1) + rl.Release() + } + } + }() + } + + wg.Wait() + + if successCount.Load() == 0 { + t.Error("expected at least one successful acquires") + } + + time.Sleep(2 * time.Second) + + // After all goroutines finish and cooldowns complete, token should be available + rl.mu.Lock() + if !rl.available { + t.Error("rate limiter should be in available state after all operations complete") + } + rl.mu.Unlock() +} diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index a3813e4..4a423ab 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 78918b4..d344541 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -1,14 +1,16 @@ -package static_pool //nolint:stylecheck +// Dynamic allocator for the static pool implementation +// It allocates new workers with batch spawn rate when there are no free workers +// It uses 2 functions: addMoreWorkers to allocate new workers and startIdleTTLListener +package static_pool import ( "context" - stderr "errors" "sync" "sync/atomic" "time" - "github.com/roadrunner-server/errors" "github.com/roadrunner-server/pool/pool" + "github.com/roadrunner-server/pool/pool/ratelimiter" "github.com/roadrunner-server/pool/worker" "github.com/roadrunner-server/pool/worker_watcher" "go.uber.org/zap" @@ -21,16 +23,18 @@ type dynAllocator struct { idleTimeout time.Duration // internal - currAllocated atomic.Pointer[uint64] - mu *sync.Mutex - execLock *sync.RWMutex - ttlTriggerChan chan struct{} - started atomic.Pointer[bool] - log *zap.Logger + currAllocated atomic.Uint64 + mu *sync.Mutex + started atomic.Bool + log *zap.Logger // pool ww *worker_watcher.WorkerWatcher allocator func() (*worker.Process, error) stopCh chan struct{} + // the case is, that multiple goroutines can call addMoreWorkers at the same time + // and we need to omit some NoFreeWorker calls if one is already in progress within the same time frame + rateLimit *ratelimiter.RateLimiter + lastAllocTry atomic.Pointer[time.Time] } func newDynAllocator( @@ -38,32 +42,39 @@ func newDynAllocator( ww *worker_watcher.WorkerWatcher, alloc func() (*worker.Process, error), stopCh chan struct{}, - execLock *sync.RWMutex, cfg *pool.Config) *dynAllocator { da := &dynAllocator{ - maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, - spawnRate: cfg.DynamicAllocatorOpts.SpawnRate, - idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, - mu: &sync.Mutex{}, - ttlTriggerChan: make(chan struct{}, 1), - ww: ww, - execLock: execLock, - allocator: alloc, - log: log, - stopCh: stopCh, + maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, + spawnRate: cfg.DynamicAllocatorOpts.SpawnRate, + idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, + mu: &sync.Mutex{}, + ww: ww, + allocator: alloc, + log: log, + stopCh: stopCh, + rateLimit: ratelimiter.NewRateLimiter(time.Second), } - da.currAllocated.Store(p(uint64(0))) - da.started.Store(p(false)) + da.currAllocated.Store(0) + da.started.Store(false) return da } -func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { - const op = errors.Op("allocate_dynamically") +func (da *dynAllocator) addMoreWorkers() { + // set the last allocation try time + // we need to store this to prevent immediate deallocation in the TTL listener + da.lastAllocTry.Store(p(time.Now().UTC())) - // obtain an operation lock - // we can use a lock-free approach here, but it's not necessary + if !da.rateLimit.TryAcquire() { + da.log.Warn("rate limit exceeded for dynamic allocation, skipping") + return + } + + // return the token after 1 second + defer da.rateLimit.Release() + + // operation lock da.mu.Lock() defer da.mu.Unlock() @@ -72,132 +83,129 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { zap.Uint64("max_workers", da.maxWorkers), zap.Uint64("spawn_rate", da.spawnRate)) - if !*da.started.Load() { + if !da.started.Load() { // start the dynamic allocator listener - da.dynamicTTLListener() - da.started.Store(p(true)) - } else { - da.log.Debug("dynamic allocator listener already started, trying to allocate worker immediately with 2s timeout") - // if the listener was started we can try to get the worker with a very short timeout, which was probably allocated by the previous NoFreeWorkers error - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - w, err := da.ww.Take(ctx) - cancel() - if err != nil { - if errors.Is(errors.NoFreeWorkers, err) { - goto allocate - } - - return nil, errors.E(op, err) - } - - return w, nil + da.startIdleTTLListener() + da.started.Store(true) } -allocate: - // otherwise, we can try to allocate a new batch of workers - // if we already allocated max workers, we can't allocate more - if *da.currAllocated.Load() >= da.maxWorkers { + if da.currAllocated.Load() >= da.maxWorkers { // can't allocate more - return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option (max_workers limit is 100)")) + da.log.Warn("can't allocate more workers, already allocated max workers", zap.Uint64("max_workers", da.maxWorkers)) + return } - // we starting from the 1 because we already allocated one worker which would be released in the Exec function + // we're starting from the 1 because we already allocated one worker which would be released in the Exec function // i < da.spawnRate - we can't allocate more workers than the spawn rate for i := uint64(0); i < da.spawnRate; i++ { - // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers - if *da.currAllocated.Load() >= da.maxWorkers { + // spawn as many workers as the user specified in the spawn rate configuration, but not more than max workers + if da.currAllocated.Load() >= da.maxWorkers { break } err := da.ww.AddWorker() if err != nil { - return nil, errors.E(op, err) - } - - // reset ttl after every alloated worker - select { - case da.ttlTriggerChan <- struct{}{}: - case <-time.After(time.Minute): - return nil, errors.E(op, stderr.New("failed to reset the TTL listener")) + da.log.Error("failed to allocate worker", zap.Error(err)) + continue } - // increase number of additionally allocated options - _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) - da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) + // increase the number of additionally allocated options + aw := da.currAllocated.Add(1) + da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", aw)) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - w, err := da.ww.Take(ctx) - cancel() - - return w, err + da.log.Debug("currently allocated", zap.Uint64("number", da.currAllocated.Load())) } -func (da *dynAllocator) dynamicTTLListener() { +func (da *dynAllocator) startIdleTTLListener() { da.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", da.idleTimeout)) go func() { // DynamicAllocatorOpts are read-only, so we can use them without a lock triggerTTL := time.NewTicker(da.idleTimeout) + defer triggerTTL.Stop() + for { select { case <-da.stopCh: da.log.Debug("dynamic allocator listener stopped") - goto exit + // Acquire lock before setting started=false to prevent race with addMoreWorkers + da.mu.Lock() + da.started.Store(false) + da.mu.Unlock() + da.log.Debug("dynamic allocator listener exited") + return // when this channel is triggered, we should deallocate all dynamically allocated workers case <-triggerTTL.C: da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) - // get the Exec (the whole operation) lock - da.execLock.Lock() + // check the last allocation time - if we had an allocation recently (within idleTimeout), we should skip deallocation + lastAlloc := da.lastAllocTry.Load() + if lastAlloc != nil && time.Since(*lastAlloc) < da.idleTimeout { + da.log.Debug("skipping deallocation of dynamic workers, recent allocation detected") + continue + } + // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated da.mu.Lock() // if we don't have any dynamically allocated workers, we can skip the deallocation - if *da.currAllocated.Load() == 0 { + if da.currAllocated.Load() == 0 { + // Set started=false BEFORE releasing the lock + // This prevents the race condition where addMoreWorkers() sees started=true + // but the listener is about to exit + da.started.Store(false) da.mu.Unlock() - da.execLock.Unlock() - goto exit + da.log.Debug("dynamic allocator listener exited, no workers to deallocate") + return + } + + alloc := da.currAllocated.Load() + da.log.Debug("deallocating dynamically allocated workers", zap.Uint64("to_deallocate", alloc)) + + if alloc >= da.spawnRate { + // deallocate in batches + alloc = da.spawnRate } - alloc := *da.currAllocated.Load() for range alloc { - // take the worker from the stack, inifinite timeout - // we should not block here forever - err := da.ww.RemoveWorker(context.Background()) + // Use a context with timeout to prevent indefinite blocking + // The timeout should be reasonable - use idle timeout as a reference + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + err := da.ww.RemoveWorker(ctx) + cancel() + // the only error we can get here is NoFreeWorkers, meaning all workers are busy if err != nil { - da.log.Error("failed to take worker from the stack", zap.Error(err)) - continue + // we should stop deallocation attempts + da.log.Error("failed to remove worker from the pool, stopping deallocation", zap.Error(err)) + // Don't decrement counter if removal failed - worker still exists + break } - // reset the number of allocated workers - // potential problem: if we'd have an error in the da.ww.Take code block, we'd still have the currAllocated > 0 - - // decrease number of additionally allocated options - _ = da.currAllocated.Swap(p(*da.currAllocated.Load() - 1)) - da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) + // decrease the number of additionally allocated workers + nw := da.currAllocated.Add(^uint64(0)) + da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", nw)) } - if *da.currAllocated.Load() != 0 { - da.log.Error("failed to deallocate all dynamically allocated workers", zap.Uint64("remaining", *da.currAllocated.Load())) + if da.currAllocated.Load() > 0 { + // if we still have allocated workers, we should keep the listener running + da.mu.Unlock() + da.log.Debug("dynamic allocator listener continuing, still have dynamically allocated workers", zap.Uint64("remaining", da.currAllocated.Load())) + continue } + // CRITICAL FIX: Set started=false BEFORE releasing the lock + // This ensures that any addMoreWorkers() call that acquires the lock + // after this point will see started=false and start a new listener + da.started.Store(false) + da.lastAllocTry.Store(nil) da.mu.Unlock() - da.execLock.Unlock() - triggerTTL.Stop() - goto exit - - // when this channel is triggered, we should extend the TTL of all dynamically allocated workers - case <-da.ttlTriggerChan: - da.log.Debug("TTL trigger received, extending TTL of all dynamically allocated workers") - triggerTTL.Reset(da.idleTimeout) + da.log.Debug("dynamic allocator listener exited, all dynamically allocated workers deallocated") + return } } - exit: - da.started.Store(p(false)) - da.log.Debug("dynamic allocator listener exited, all dynamically allocated workers deallocated") }() } -func p[T any](val T) *T { - return &val +func p[T any](v T) *T { + return &v } diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go new file mode 100644 index 0000000..c07a74e --- /dev/null +++ b/pool/static_pool/dyn_allocator_test.go @@ -0,0 +1,329 @@ +package static_pool + +import ( + "context" + "os/exec" + "sync" + "testing" + "time" + + "github.com/roadrunner-server/pool/ipc/pipe" + "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/pool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +var dynlog = func() *zap.Logger { + logger, _ := zap.NewDevelopment() + return logger +} + +var testDynCfg = &pool.Config{ + NumWorkers: 5, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 10, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 25, + SpawnRate: 5, + IdleTimeout: time.Second * 10, + }, +} + +func Test_DynAllocator(t *testing.T) { + ctx := context.Background() + np, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(dynlog()), + testDynCfg, + dynlog(), + ) + assert.NoError(t, err) + assert.NotNil(t, np) + + r, err := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + + assert.Equal(t, []byte("hello"), resp.Body()) + assert.NoError(t, err) + + np.Destroy(ctx) +} + +func Test_DynAllocatorManyReq(t *testing.T) { + var testDynCfgMany = &pool.Config{ + NumWorkers: 5, + MaxJobs: 2, + AllocateTimeout: time.Second * 1, + DestroyTimeout: time.Second * 10, + ResetTimeout: time.Second * 5, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 25, + SpawnRate: 5, + IdleTimeout: time.Second * 10, + }, + } + + ctx := context.Background() + np, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/client.php", "slow_req", "pipes") + }, + pipe.NewPipeFactory(dynlog()), + testDynCfgMany, + dynlog(), + ) + assert.NoError(t, err) + assert.NotNil(t, np) + + wg := &sync.WaitGroup{} + wg.Add(1000) + go func() { + for range 1000 { + go func() { + defer wg.Done() + r, erre := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + if erre != nil { + t.Log("failed request: ", erre.Error()) + return + } + resp := <-r + assert.Equal(t, []byte("hello"), resp.Body()) + }() + } + }() + + go func() { + for range 10 { + time.Sleep(time.Second) + _ = np.Reset(context.Background()) + } + }() + + wg.Wait() + + time.Sleep(time.Second * 30) + + assert.Equal(t, 5, len(np.Workers())) + + np.Destroy(ctx) +} + +func Test_DynamicPool_OverMax(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 20, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 1, + IdleTimeout: time.Second * 15, + SpawnRate: 10, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + t.Log("sending request 1") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + t.Log("request 1 finished") + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + go func() { + // sleep to ensure the first request is being processed first + // this request should trigger dynamic allocation attempt and return an error + time.Sleep(time.Second) + t.Log("sending request 2") + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + t.Log("waiting for the requests 1 and 2") + wg.Wait() + t.Log("wait 1 and 2 finished") + + wg.Add(2) + // request 3 and 4 should be processed normally, since we have 2 workers now (1 initial + 1 dynamic) + t.Log("starting requests 3 and 4") + require.Len(t, p.Workers(), 2) + + go func() { + t.Log("request 3") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + t.Log("request 3 finished") + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + go func() { + t.Log("request 4") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + t.Log("request 4 finished") + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + t.Log("waiting for the requests 3 and 4") + wg.Wait() + time.Sleep(time.Second * 20) + assert.Len(t, p.Workers(), 1) + p.Destroy(ctx) +} + +func Test_DynamicPool(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 5, + IdleTimeout: time.Second * 15, + SpawnRate: 2, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + }() + + go func() { + time.Sleep(time.Second) + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + wg.Wait() + + time.Sleep(time.Second * 20) + require.Len(t, p.Workers(), 1) + + p.Destroy(ctx) +} + +func Test_DynamicPool_500W(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 10, + IdleTimeout: time.Second * 15, + // should be corrected to 10 by RR + SpawnRate: 11, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + require.Len(t, p.Workers(), 1) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + go func() { + time.Sleep(time.Second * 1) + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + wg.Wait() + + time.Sleep(time.Second * 30) + + require.Len(t, p.Workers(), 1) + p.Destroy(ctx) +} diff --git a/pool/static_pool/fuzz_test.go b/pool/static_pool/fuzz_test.go index 42fcf0c..408efe3 100644 --- a/pool/static_pool/fuzz_test.go +++ b/pool/static_pool/fuzz_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/pool/static_pool/options.go b/pool/static_pool/options.go index f2084aa..f715982 100644 --- a/pool/static_pool/options.go +++ b/pool/static_pool/options.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "go.uber.org/zap" diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 3c4b6c0..be47a16 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" @@ -19,11 +19,11 @@ import ( ) const ( - // StopRequest can be sent by worker to indicate that restart is required. + // StopRequest can be sent by a worker to indicate that restart is required. StopRequest = `{"stop":true}` ) -// Pool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +// Pool controls worker creation, destruction and task routing. Pool uses a fixed number of workers. type Pool struct { // pool configuration cfg *pool.Config @@ -31,7 +31,7 @@ type Pool struct { log *zap.Logger // worker command creator cmd pool.Command - // creates and connects to stack + // creates and connects to workers factory pool.Factory // manages worker states and TTLs ww *workerWatcher.WorkerWatcher @@ -48,7 +48,7 @@ type Pool struct { mu sync.RWMutex } -// NewPool creates a new worker pool and task multiplexer. Pool will initiate with one worker. If supervisor configuration is provided -> pool will be turned into a supervisedExec mode +// NewPool creates a new worker pool and task multiplexer. Pool will initialize with the configured number of workers. If supervisor configuration is provided -> pool will be turned into a supervisedExec mode func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *pool.Config, log *zap.Logger, options ...Options) (*Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") @@ -94,12 +94,12 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p } } - // set up workers allocator + // set up workers' allocator p.allocator = pool.NewPoolAllocator(ctx, p.cfg.AllocateTimeout, p.cfg.MaxJobs, factory, cmd, p.cfg.Command, p.log) - // set up workers watcher + // set up workers' watcher p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout) - // allocate requested number of workers + // allocate the requested number of workers workers, err := pool.AllocateParallel(p.cfg.NumWorkers, p.allocator) if err != nil { return nil, err @@ -122,7 +122,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p } if p.cfg.DynamicAllocatorOpts != nil { - p.dynamicAllocator = newDynAllocator(p.log, p.ww, p.allocator, p.stopCh, &p.mu, p.cfg) + p.dynamicAllocator = newDynAllocator(p.log, p.ww, p.allocator, p.stopCh, p.cfg) } return p, nil @@ -133,7 +133,7 @@ func (sp *Pool) GetConfig() *pool.Config { return sp.cfg } -// Workers returns worker list associated with the pool. +// Workers returns a worker list associated with the pool. func (sp *Pool) Workers() (workers []*worker.Process) { return sp.ww.List() } @@ -186,7 +186,7 @@ func (sp *Pool) Exec(ctx context.Context, p *payload.Payload, stopCh chan struct } /* - register request in the QUEUE + register a request in the QUEUE */ atomic.AddUint64(&sp.queue, 1) defer atomic.AddUint64(&sp.queue, ^uint64(0)) @@ -222,7 +222,7 @@ begin: // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - // for this case, worker already killed in the ExecTTL function + // in this case, the worker already killed in the ExecTTL function sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) w.State().Transition(fsm.StateExecTTLReached) @@ -230,7 +230,7 @@ begin: return nil, err case errors.Is(errors.SoftJob, err): /* - in case of soft job error, we should not kill the worker, this is just an error payload from the worker. + in case of soft job error, we should not kill the worker; this is just an error payload from the worker. */ w.State().Transition(fsm.StateReady) sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err)) @@ -260,7 +260,7 @@ begin: } } - // worker want's to be terminated + // worker wants to be terminated // unsafe is used to quickly transform []byte to string if len(rsp.Body) == 0 && unsafe.String(unsafe.SliceData(rsp.Context), len(rsp.Context)) == StopRequest { w.State().Transition(fsm.StateInvalid) @@ -271,14 +271,14 @@ begin: switch { case rsp.Flags&frame.STREAM != 0: sp.log.Debug("stream mode", zap.Int64("pid", w.Pid())) - // create channel for the stream (only if there are no errors) + // create a channel for the stream (only if there are no errors) // we need to create a buffered channel to prevent blocking // stream buffer size should be bigger than regular, to have some payloads ready (optimization) resp := make(chan *PExec, 5) // send the initial frame resp <- newPExec(rsp, nil) - // in case of stream we should not return worker back immediately + // in case of stream, we should not return worker back immediately go func() { // would be called on Goexit defer func() { @@ -290,7 +290,7 @@ begin: // stream iterator for { select { - // we received stop signal + // we received a stop signal case <-stopCh: sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) @@ -381,10 +381,10 @@ func (sp *Pool) NumDynamic() uint64 { return 0 } - return *sp.dynamicAllocator.currAllocated.Load() + return sp.dynamicAllocator.currAllocated.Load() } -// Destroy all underlying stack (but let them complete the task). +// Destroy all underlying workers (but let them complete the task). func (sp *Pool) Destroy(ctx context.Context) { sp.log.Info("destroy signal received", zap.Duration("timeout", sp.cfg.DestroyTimeout)) var cancel context.CancelFunc @@ -422,7 +422,7 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr // Get function consumes context with timeout w, err := sp.ww.Take(ctxGetFree) if err != nil { - // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + // if the error is of kind NoFreeWorkers, it means that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { sp.log.Error( "no free workers in the pool, wait timeout exceed", @@ -431,16 +431,15 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr zap.Error(err), ) - // if we don't have dynamic allocator or in debug mode, we can't allocate a new worker + // if we don't have a dynamic allocator or in debug mode, we can't allocate a new worker if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { return nil, errors.E(op, errors.NoFreeWorkers) } - // for the dynamic allocator, we can would have many requests waiting at the same time on the lock in the dyn allocator - // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically - // however, requests waiting for the lock, won't allocate a new worker and would be failed - - return sp.dynamicAllocator.allocateDynamically() + // for the dynamic allocator, we would have many requests waiting at the same time on the lock in the dyn allocator + // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the addMoreWorkers + // however, requests waiting for the lock won't allocate a new worker and would be failed + sp.dynamicAllocator.addMoreWorkers() } // else if err not nil - return error return nil, errors.E(op, err) diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index ff171c8..477115a 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" @@ -55,262 +55,6 @@ func Test_NewPool(t *testing.T) { p.Destroy(ctx) } -func Test_DynamicPool_OverMax(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 20, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 1, - IdleTimeout: time.Second * 15, - SpawnRate: 10, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - wg.Add(2) - time.Sleep(time.Second * 20) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - time.Sleep(time.Second * 20) - - assert.Len(t, p.Workers(), 1) - - p.Destroy(ctx) -} - -func Test_DynamicPool_SeveralCalls(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 20, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 5, - IdleTimeout: time.Second * 15, - SpawnRate: 2, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - assert.Equal(t, uint64(2), p.NumDynamic()) - - wg.Add(2) - time.Sleep(time.Second * 20) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - assert.Equal(t, uint64(2), p.NumDynamic()) - time.Sleep(time.Second * 20) - - assert.Equal(t, uint64(0), p.NumDynamic()) - assert.Len(t, p.Workers(), 1) - - p.Destroy(ctx) -} - -func Test_DynamicPool(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 5, - IdleTimeout: time.Second * 15, - SpawnRate: 2, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - time.Sleep(time.Second * 20) - - p.Destroy(ctx) -} - func Test_MaxWorkers(t *testing.T) { dynAllCfg := &pool.Config{ NumWorkers: 501, @@ -332,71 +76,6 @@ func Test_MaxWorkers(t *testing.T) { assert.Nil(t, p) } -func Test_DynamicPool_500W(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 100, - IdleTimeout: time.Second * 15, - // should be corrected to 100 by RR - SpawnRate: 101, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - require.Len(t, p.Workers(), 1) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - assert.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - assert.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - require.Len(t, p.Workers(), 101) - time.Sleep(time.Second * 20) - p.Destroy(ctx) -} - func Test_NewPoolAddRemoveWorkers(t *testing.T) { testCfg2 := &pool.Config{ NumWorkers: 1, diff --git a/pool/static_pool/stream.go b/pool/static_pool/stream.go index 9e6fa07..3b58aae 100644 --- a/pool/static_pool/stream.go +++ b/pool/static_pool/stream.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import "github.com/roadrunner-server/pool/payload" diff --git a/pool/static_pool/supervisor.go b/pool/static_pool/supervisor.go index f38bc3b..4a8d9d4 100644 --- a/pool/static_pool/supervisor.go +++ b/pool/static_pool/supervisor.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "time" diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index f1bef47..865917d 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/tests/composer.json b/tests/composer.json index 5aecd89..844ade6 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -5,10 +5,9 @@ "nyholm/psr7": "^1.4", "spiral/roadrunner-http": "^3.5", "spiral/roadrunner-worker": "^3.5", - "temporal/sdk": ">=1.0", "spiral/tokenizer": ">=2.7", "spiral/goridge": "^4.0", - "spiral/roadrunner-metrics": "^2.0" + "spiral/roadrunner-metrics": "^3.0" }, "autoload": { "psr-4": { diff --git a/tests/slow_req.php b/tests/slow_req.php new file mode 100644 index 0000000..8fcf3c8 --- /dev/null +++ b/tests/slow_req.php @@ -0,0 +1,27 @@ +waitPayload()) { + try { + $counter++; + + if ($counter % 2 === 0) { + $rr->error('test error'); + continue; + } + + sleep(100); + $rr->respond(new RoadRunner\Payload((string)$in->body)); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 8014fbe..5475c8b 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -25,7 +25,8 @@ func NewVector() *Vec { vec := &Vec{ destroy: 0, reset: 0, - workers: make(chan *worker.Process, 600), + // currently, we can have up to 2048 workers in the pool + workers: make(chan *worker.Process, 2048), } return vec @@ -61,7 +62,7 @@ func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { select { case <-ctx.Done(): default: - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 10) } } diff --git a/worker_watcher/container/channel/vec_test.go b/worker_watcher/container/channel/vec_test.go new file mode 100644 index 0000000..d4856c9 --- /dev/null +++ b/worker_watcher/container/channel/vec_test.go @@ -0,0 +1,122 @@ +package channel + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/pool/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// helper to create a simple worker for testing +func createTestWorker(t *testing.T) *worker.Process { + t.Helper() + cmd := exec.Command("php", "-v") + w, err := worker.InitBaseWorker(cmd) + require.NoError(t, err) + return w +} + +// ==================== General Tests ==================== + +// Test_Vec_PushPop tests basic push and pop operations +func Test_Vec_PushPop(t *testing.T) { + vec := NewVector() + w := createTestWorker(t) + + // Push worker + vec.Push(w) + assert.Equal(t, 1, vec.Len()) + + // Pop worker + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + popped, err := vec.Pop(ctx) + require.NoError(t, err) + assert.Equal(t, w, popped) + assert.Equal(t, 0, vec.Len()) +} + +// Test_Vec_Len tests that Len returns correct count +func Test_Vec_Len(t *testing.T) { + vec := NewVector() + + assert.Equal(t, 0, vec.Len()) + + // Push multiple workers + for range 5 { + w := createTestWorker(t) + vec.Push(w) + } + + assert.Equal(t, 5, vec.Len()) + + // Pop one + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, err := vec.Pop(ctx) + require.NoError(t, err) + + assert.Equal(t, 4, vec.Len()) +} + +// ==================== Edge Case Tests ==================== + +// Test_Vec_PopAfterDestroy tests that Pop returns WatcherStopped error after Destroy is called +func Test_Vec_PopAfterDestroy(t *testing.T) { + vec := NewVector() + w := createTestWorker(t) + vec.Push(w) + + // Destroy the vector + vec.Destroy() + + // Pop should fail immediately with WatcherStopped + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + popped, err := vec.Pop(ctx) + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.WatcherStopped, err)) +} + +// Test_Vec_PopWithCanceledContext tests that Pop returns error when context is canceled +func Test_Vec_PopWithCanceledContext(t *testing.T) { + vec := NewVector() + // Don't push any workers - channel is empty + + // Create an already canceled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + popped, err := vec.Pop(ctx) + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.NoFreeWorkers, err)) +} + +// Test_Vec_PopContextTimeout tests that Pop returns error when context times out waiting for worker +func Test_Vec_PopContextTimeout(t *testing.T) { + vec := NewVector() + // Empty channel - no workers to pop + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + popped, err := vec.Pop(ctx) + elapsed := time.Since(start) + + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.NoFreeWorkers, err)) + // Verify it actually waited for the timeout + assert.True(t, elapsed >= 50*time.Millisecond, "should have waited for timeout") +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 39de596..adfba4c 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:stylecheck +package worker_watcher import ( "context" @@ -15,21 +15,22 @@ import ( "go.uber.org/zap" ) -const maxWorkers = 500 +const maxWorkers = 2048 // Allocator is responsible for worker allocation in the pool type Allocator func() (*worker.Process, error) type WorkerWatcher struct { - sync.RWMutex + mu sync.RWMutex // actually don't have a lot of impl here, so interface not needed container *channel.Vec // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers atomic.Uint64 eventBus *events.Bus // map with the worker's pointers - workers map[int64]*worker.Process + workers sync.Map + // workers map[int64]*worker.Process log *zap.Logger @@ -40,27 +41,29 @@ type WorkerWatcher struct { // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { eb, _ := events.NewEventBus() - return &WorkerWatcher{ - container: channel.NewVector(), - log: log, - eventBus: eb, - // pass a ptr to the number of workers to avoid blocking in the TTL loop - numWorkers: numWorkers, + ww := &WorkerWatcher{ + container: channel.NewVector(), + log: log, + eventBus: eb, allocateTimeout: allocateTimeout, - workers: make(map[int64]*worker.Process, numWorkers), + workers: sync.Map{}, allocator: allocator, } + + // pass a ptr to the number of workers to avoid blocking in the TTL loop + ww.numWorkers.Store(numWorkers) + return ww } func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { - ww.Lock() - defer ww.Unlock() + ww.mu.Lock() + defer ww.mu.Unlock() // else we can add all workers for i := range workers { ww.container.Push(workers[i]) // add worker to watch slice - ww.workers[workers[i].Pid()] = workers[i] + ww.workers.Store(workers[i].Pid(), workers[i]) ww.addToWatch(workers[i]) } @@ -68,7 +71,10 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { } func (ww *WorkerWatcher) AddWorker() error { - if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { + ww.mu.Lock() + defer ww.mu.Unlock() + + if ww.numWorkers.Load() >= maxWorkers { return errors.E(errors.WorkerAllocate, errors.Str("container is full, maximum number of workers reached")) } @@ -77,13 +83,16 @@ func (ww *WorkerWatcher) AddWorker() error { return err } - atomic.AddUint64(&ww.numWorkers, 1) + ww.numWorkers.Add(1) return nil } func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { + ww.mu.Lock() + defer ww.mu.Unlock() + // can't remove the last worker - if atomic.LoadUint64(&ww.numWorkers) == 1 { + if ww.numWorkers.Load() == 1 { ww.log.Warn("can't remove the last worker") return nil } @@ -97,10 +106,8 @@ func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { w.State().Transition(fsm.StateDestroyed) _ = w.Stop() - atomic.AddUint64(&ww.numWorkers, ^uint64(0)) - ww.Lock() - delete(ww.workers, w.Pid()) - ww.Unlock() + ww.numWorkers.Add(^uint64(0)) + ww.workers.Delete(w.Pid()) return nil } @@ -176,7 +183,7 @@ func (ww *WorkerWatcher) Allocate() error { select { case <-tt: // reduce the number of workers - atomic.AddUint64(&ww.numWorkers, ^uint64(0)) + ww.numWorkers.Add(^uint64(0)) allocateFreq.Stop() // timeout exceeds, worker can't be allocated return errors.E(op, errors.WorkerAllocate, err) @@ -200,9 +207,12 @@ done: // add worker to Wait ww.addToWatch(sw) // add a new worker to the worker's slice (to get information about workers in parallel) - ww.Lock() - ww.workers[sw.Pid()] = sw - ww.Unlock() + if w, ok := ww.workers.LoadAndDelete(sw.Pid()); ok { + ww.log.Warn("allocated worker already exists, killing duplicate, report this case", zap.Int64("pid", sw.Pid())) + _ = w.(*worker.Process).Kill() + } + + ww.workers.Store(sw.Pid(), sw) // push the worker to the container ww.Release(sw) return nil @@ -244,82 +254,76 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { for { select { case <-tt.C: - ww.RLock() + ww.mu.RLock() // that might be one of the workers is working. To proceed, all workers should be inside a channel - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec - ww.RUnlock() + if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec + ww.mu.RUnlock() continue } - ww.RUnlock() + ww.mu.RUnlock() // All workers at this moment are in the container - // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.Lock() + // Pop operation is blocked; push can't be done, since it's not possible to pop + ww.mu.Lock() wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } - wg.Wait() + w.Callback() + }) - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } + ww.workers.Delete(key) + return true + }) + + wg.Wait() ww.container.ResetDone() // todo: rustatian, do we need this mutex? - ww.Unlock() + ww.mu.Unlock() - return atomic.LoadUint64(&ww.numWorkers) + return ww.numWorkers.Load() case <-ctx.Done(): // kill workers - ww.Lock() + ww.mu.Lock() // drain workers slice wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) - // stop the worker - _ = v.Stop() + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) + // kill the worker + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } + w.Callback() + }) - wg.Wait() - - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } + ww.workers.Delete(key) + return true + }) + wg.Wait() ww.container.ResetDone() - ww.Unlock() + ww.mu.Unlock() - return atomic.LoadUint64(&ww.numWorkers) + return ww.numWorkers.Load() } } } // Destroy all underlying containers (but let them complete the task) func (ww *WorkerWatcher) Destroy(ctx context.Context) { - ww.Lock() + ww.mu.Lock() // do not release new workers ww.container.Destroy() - ww.Unlock() + ww.mu.Unlock() tt := time.NewTicker(time.Second * 1) // destroy container; we don't use ww mutex here, since we should be able to push worker @@ -327,65 +331,59 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: - ww.RLock() + ww.mu.RLock() // that might be one of the workers is working - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec - ww.RUnlock() + if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec + ww.mu.RUnlock() continue } - ww.RUnlock() + ww.mu.RUnlock() // All workers at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.Lock() + ww.mu.Lock() // drain a channel, this operation will not actually pop, only drain a channel _, _ = ww.container.Pop(ctx) wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() - }(pid, w) - } + _ = w.Stop() + // remove worker from the channel + w.Callback() + }) + ww.workers.Delete(key) + return true + }) wg.Wait() - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } - - ww.Unlock() + ww.mu.Unlock() return case <-ctx.Done(): // kill workers - ww.Lock() + ww.mu.Lock() wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } + w.Callback() + }) + ww.workers.Delete(key) + return true + }) wg.Wait() - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } - - ww.Unlock() + ww.mu.Unlock() return } } @@ -393,18 +391,16 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { // List - this is O(n) operation, and it will return copy of the actual workers func (ww *WorkerWatcher) List() []*worker.Process { - ww.RLock() - defer ww.RUnlock() - - if atomic.LoadUint64(&ww.numWorkers) == 0 { + if ww.numWorkers.Load() == 0 { return nil } base := make([]*worker.Process, 0, 2) - for _, w := range ww.workers { - base = append(base, w) - } + ww.workers.Range(func(key, value any) bool { + base = append(base, value.(*worker.Process)) + return true + }) return base } @@ -416,9 +412,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { } // remove worker - ww.Lock() - delete(ww.workers, w.Pid()) - ww.Unlock() + ww.workers.Delete(w.Pid()) if w.State().Compare(fsm.StateDestroyed) { // worker was manually destroyed, no need to replace @@ -429,7 +423,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { err = ww.Allocate() if err != nil { ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - if atomic.LoadUint64(&ww.numWorkers) == 0 { + if ww.numWorkers.Load() == 0 { panic("no workers available, can't run the application") }