From 27a0ae278f5305546fe89d3052b4ece984d49361 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 11:17:54 +0200 Subject: [PATCH 01/18] fix: dynamic allocator rework Signed-off-by: Valery Piashchynski --- go.mod | 2 +- pool/static_pool/debug.go | 2 +- pool/static_pool/dyn_allocator.go | 80 ++++++----------- pool/static_pool/dyn_allocator_test.go | 114 +++++++++++++++++++++++++ pool/static_pool/fuzz_test.go | 2 +- pool/static_pool/options.go | 2 +- pool/static_pool/pool.go | 42 ++++----- pool/static_pool/pool_test.go | 2 +- pool/static_pool/stream.go | 2 +- pool/static_pool/supervisor.go | 2 +- pool/static_pool/supervisor_test.go | 2 +- tests/slow_req.php | 27 ++++++ worker_watcher/worker_watcher.go | 4 +- 13 files changed, 196 insertions(+), 87 deletions(-) create mode 100644 pool/static_pool/dyn_allocator_test.go create mode 100644 tests/slow_req.php diff --git a/go.mod b/go.mod index 78e0e2a..1f19412 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/roadrunner-server/pool -go 1.24.0 +go 1.25 require ( github.com/roadrunner-server/errors v1.4.1 diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index a3813e4..4a423ab 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 78918b4..20af0df 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -1,8 +1,8 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" - stderr "errors" + "fmt" "sync" "sync/atomic" "time" @@ -21,12 +21,11 @@ type dynAllocator struct { idleTimeout time.Duration // internal - currAllocated atomic.Pointer[uint64] - mu *sync.Mutex - execLock *sync.RWMutex - ttlTriggerChan chan struct{} - started atomic.Pointer[bool] - log *zap.Logger + currAllocated atomic.Pointer[uint64] + mu *sync.Mutex + execLock *sync.RWMutex + started atomic.Pointer[bool] + log *zap.Logger // pool ww *worker_watcher.WorkerWatcher allocator func() (*worker.Process, error) @@ -41,16 +40,15 @@ func newDynAllocator( execLock *sync.RWMutex, cfg *pool.Config) *dynAllocator { da := &dynAllocator{ - maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, - spawnRate: cfg.DynamicAllocatorOpts.SpawnRate, - idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, - mu: &sync.Mutex{}, - ttlTriggerChan: make(chan struct{}, 1), - ww: ww, - execLock: execLock, - allocator: alloc, - log: log, - stopCh: stopCh, + maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, + spawnRate: cfg.DynamicAllocatorOpts.SpawnRate, + idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, + mu: &sync.Mutex{}, + ww: ww, + execLock: execLock, + allocator: alloc, + log: log, + stopCh: stopCh, } da.currAllocated.Store(p(uint64(0))) @@ -62,7 +60,7 @@ func newDynAllocator( func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { const op = errors.Op("allocate_dynamically") - // obtain an operation lock + // operation lock // we can use a lock-free approach here, but it's not necessary da.mu.Lock() defer da.mu.Unlock() @@ -76,36 +74,18 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { // start the dynamic allocator listener da.dynamicTTLListener() da.started.Store(p(true)) - } else { - da.log.Debug("dynamic allocator listener already started, trying to allocate worker immediately with 2s timeout") - // if the listener was started we can try to get the worker with a very short timeout, which was probably allocated by the previous NoFreeWorkers error - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - w, err := da.ww.Take(ctx) - cancel() - if err != nil { - if errors.Is(errors.NoFreeWorkers, err) { - goto allocate - } - - return nil, errors.E(op, err) - } - - return w, nil } -allocate: - // otherwise, we can try to allocate a new batch of workers - // if we already allocated max workers, we can't allocate more - if *da.currAllocated.Load() >= da.maxWorkers { + if *da.currAllocated.Load() == da.maxWorkers { // can't allocate more - return nil, errors.E(op, stderr.New("can't allocate more workers, increase max_workers option (max_workers limit is 100)")) + return nil, errors.E(op, fmt.Errorf("can't allocate more workers, increase max_workers option (max_workers limit is %d)", da.maxWorkers), errors.NoFreeWorkers) } - // we starting from the 1 because we already allocated one worker which would be released in the Exec function + // we're starting from the 1 because we already allocated one worker which would be released in the Exec function // i < da.spawnRate - we can't allocate more workers than the spawn rate for i := uint64(0); i < da.spawnRate; i++ { - // spawn as much workers as user specified in the spawn rate configuration, but not more than max workers + // spawn as many workers as the user specified in the spawn rate configuration, but not more than max workers if *da.currAllocated.Load() >= da.maxWorkers { break } @@ -115,14 +95,7 @@ allocate: return nil, errors.E(op, err) } - // reset ttl after every alloated worker - select { - case da.ttlTriggerChan <- struct{}{}: - case <-time.After(time.Minute): - return nil, errors.E(op, stderr.New("failed to reset the TTL listener")) - } - - // increase number of additionally allocated options + // increase the number of additionally allocated options _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) } @@ -170,9 +143,9 @@ func (da *dynAllocator) dynamicTTLListener() { } // reset the number of allocated workers - // potential problem: if we'd have an error in the da.ww.Take code block, we'd still have the currAllocated > 0 + // potential problem: if we had an error in the da.ww.Take code block, we'd still have the currAllocated > 0 - // decrease number of additionally allocated options + // decrease the number of additionally allocated options _ = da.currAllocated.Swap(p(*da.currAllocated.Load() - 1)) da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) } @@ -185,11 +158,6 @@ func (da *dynAllocator) dynamicTTLListener() { da.execLock.Unlock() triggerTTL.Stop() goto exit - - // when this channel is triggered, we should extend the TTL of all dynamically allocated workers - case <-da.ttlTriggerChan: - da.log.Debug("TTL trigger received, extending TTL of all dynamically allocated workers") - triggerTTL.Reset(da.idleTimeout) } } exit: diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go new file mode 100644 index 0000000..daa5f08 --- /dev/null +++ b/pool/static_pool/dyn_allocator_test.go @@ -0,0 +1,114 @@ +package static_pool + +import ( + "context" + "os/exec" + "sync" + "testing" + "time" + + "github.com/roadrunner-server/pool/ipc/pipe" + "github.com/roadrunner-server/pool/payload" + "github.com/roadrunner-server/pool/pool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +var dynlog = func() *zap.Logger { + logger, _ := zap.NewDevelopment() + return logger +} + +var testDynCfg = &pool.Config{ + NumWorkers: 5, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 10, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 25, + SpawnRate: 5, + IdleTimeout: 10, + }, +} + +func Test_DynAllocator(t *testing.T) { + ctx := context.Background() + np, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(dynlog()), + testDynCfg, + dynlog(), + ) + assert.NoError(t, err) + assert.NotNil(t, np) + + r, err := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + resp := <-r + + assert.Equal(t, []byte("hello"), resp.Body()) + assert.NoError(t, err) + + np.Destroy(ctx) +} + +func Test_DynAllocatorManyReq(t *testing.T) { + var testDynCfgMany = &pool.Config{ + NumWorkers: 5, + MaxJobs: 2, + AllocateTimeout: time.Second * 1, + DestroyTimeout: time.Second * 10, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 25, + SpawnRate: 5, + IdleTimeout: time.Second * 10, + }, + } + + ctx := context.Background() + np, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/client.php", "slow_req", "pipes") + }, + pipe.NewPipeFactory(dynlog()), + testDynCfgMany, + dynlog(), + ) + assert.NoError(t, err) + assert.NotNil(t, np) + + wg := &sync.WaitGroup{} + wg.Add(10000) + go func() { + for range 10000 { + go func() { + defer wg.Done() + r, erre := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + if erre != nil { + t.Log("failed request: ", erre.Error()) + return + } + resp := <-r + require.Equal(t, []byte("hello"), resp.Body()) + }() + } + }() + + go func() { + for range 10 { + time.Sleep(time.Second) + errr := np.Reset(context.Background()) + assert.NoError(t, errr) + } + }() + + wg.Wait() + + time.Sleep(time.Second * 15) + + assert.Equal(t, 5, len(np.Workers())) + + np.Destroy(ctx) +} diff --git a/pool/static_pool/fuzz_test.go b/pool/static_pool/fuzz_test.go index 42fcf0c..408efe3 100644 --- a/pool/static_pool/fuzz_test.go +++ b/pool/static_pool/fuzz_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/pool/static_pool/options.go b/pool/static_pool/options.go index f2084aa..f715982 100644 --- a/pool/static_pool/options.go +++ b/pool/static_pool/options.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "go.uber.org/zap" diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 3c4b6c0..0939bad 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" @@ -19,11 +19,11 @@ import ( ) const ( - // StopRequest can be sent by worker to indicate that restart is required. + // StopRequest can be sent by a worker to indicate that restart is required. StopRequest = `{"stop":true}` ) -// Pool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +// Pool controls worker creation, destruction and task routing. Pool uses a fixed number of workers. type Pool struct { // pool configuration cfg *pool.Config @@ -31,7 +31,7 @@ type Pool struct { log *zap.Logger // worker command creator cmd pool.Command - // creates and connects to stack + // creates and connects to workers factory pool.Factory // manages worker states and TTLs ww *workerWatcher.WorkerWatcher @@ -48,7 +48,7 @@ type Pool struct { mu sync.RWMutex } -// NewPool creates a new worker pool and task multiplexer. Pool will initiate with one worker. If supervisor configuration is provided -> pool will be turned into a supervisedExec mode +// NewPool creates a new worker pool and task multiplexer. Pool will initialize with the configured number of workers. If supervisor configuration is provided -> pool will be turned into a supervisedExec mode func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *pool.Config, log *zap.Logger, options ...Options) (*Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") @@ -94,12 +94,12 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p } } - // set up workers allocator + // set up workers' allocator p.allocator = pool.NewPoolAllocator(ctx, p.cfg.AllocateTimeout, p.cfg.MaxJobs, factory, cmd, p.cfg.Command, p.log) - // set up workers watcher + // set up workers' watcher p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout) - // allocate requested number of workers + // allocate the requested number of workers workers, err := pool.AllocateParallel(p.cfg.NumWorkers, p.allocator) if err != nil { return nil, err @@ -133,7 +133,7 @@ func (sp *Pool) GetConfig() *pool.Config { return sp.cfg } -// Workers returns worker list associated with the pool. +// Workers return a worker list associated with the pool. func (sp *Pool) Workers() (workers []*worker.Process) { return sp.ww.List() } @@ -186,7 +186,7 @@ func (sp *Pool) Exec(ctx context.Context, p *payload.Payload, stopCh chan struct } /* - register request in the QUEUE + register a request in the QUEUE */ atomic.AddUint64(&sp.queue, 1) defer atomic.AddUint64(&sp.queue, ^uint64(0)) @@ -222,7 +222,7 @@ begin: // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - // for this case, worker already killed in the ExecTTL function + // in this case, the worker already killed in the ExecTTL function sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) w.State().Transition(fsm.StateExecTTLReached) @@ -230,7 +230,7 @@ begin: return nil, err case errors.Is(errors.SoftJob, err): /* - in case of soft job error, we should not kill the worker, this is just an error payload from the worker. + in case of soft job error, we should not kill the worker; this is just an error payload from the worker. */ w.State().Transition(fsm.StateReady) sp.log.Warn("soft worker error", zap.String("reason", "SoftJob"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerSoftError.String()), zap.Error(err)) @@ -260,7 +260,7 @@ begin: } } - // worker want's to be terminated + // worker wants to be terminated // unsafe is used to quickly transform []byte to string if len(rsp.Body) == 0 && unsafe.String(unsafe.SliceData(rsp.Context), len(rsp.Context)) == StopRequest { w.State().Transition(fsm.StateInvalid) @@ -271,14 +271,14 @@ begin: switch { case rsp.Flags&frame.STREAM != 0: sp.log.Debug("stream mode", zap.Int64("pid", w.Pid())) - // create channel for the stream (only if there are no errors) + // create a channel for the stream (only if there are no errors) // we need to create a buffered channel to prevent blocking // stream buffer size should be bigger than regular, to have some payloads ready (optimization) resp := make(chan *PExec, 5) // send the initial frame resp <- newPExec(rsp, nil) - // in case of stream we should not return worker back immediately + // in case of stream, we should not return worker back immediately go func() { // would be called on Goexit defer func() { @@ -290,7 +290,7 @@ begin: // stream iterator for { select { - // we received stop signal + // we received a stop signal case <-stopCh: sp.log.Debug("stream stop signal received", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) @@ -384,7 +384,7 @@ func (sp *Pool) NumDynamic() uint64 { return *sp.dynamicAllocator.currAllocated.Load() } -// Destroy all underlying stack (but let them complete the task). +// Destroy all underlying workers (but let them complete the task). func (sp *Pool) Destroy(ctx context.Context) { sp.log.Info("destroy signal received", zap.Duration("timeout", sp.cfg.DestroyTimeout)) var cancel context.CancelFunc @@ -422,7 +422,7 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr // Get function consumes context with timeout w, err := sp.ww.Take(ctxGetFree) if err != nil { - // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + // if the error is of kind NoFreeWorkers, it means that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { sp.log.Error( "no free workers in the pool, wait timeout exceed", @@ -431,14 +431,14 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr zap.Error(err), ) - // if we don't have dynamic allocator or in debug mode, we can't allocate a new worker + // if we don't have a dynamic allocator or in debug mode, we can't allocate a new worker if sp.cfg.DynamicAllocatorOpts == nil || sp.cfg.Debug { return nil, errors.E(op, errors.NoFreeWorkers) } - // for the dynamic allocator, we can would have many requests waiting at the same time on the lock in the dyn allocator + // for the dynamic allocator, we would have many requests waiting at the same time on the lock in the dyn allocator // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically - // however, requests waiting for the lock, won't allocate a new worker and would be failed + // however, requests waiting for the lock won't allocate a new worker and would be failed return sp.dynamicAllocator.allocateDynamically() } diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index ff171c8..2af0477 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/pool/static_pool/stream.go b/pool/static_pool/stream.go index 9e6fa07..3b58aae 100644 --- a/pool/static_pool/stream.go +++ b/pool/static_pool/stream.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import "github.com/roadrunner-server/pool/payload" diff --git a/pool/static_pool/supervisor.go b/pool/static_pool/supervisor.go index f38bc3b..4a8d9d4 100644 --- a/pool/static_pool/supervisor.go +++ b/pool/static_pool/supervisor.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "time" diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index f1bef47..865917d 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -1,4 +1,4 @@ -package static_pool //nolint:stylecheck +package static_pool import ( "context" diff --git a/tests/slow_req.php b/tests/slow_req.php new file mode 100644 index 0000000..8fcf3c8 --- /dev/null +++ b/tests/slow_req.php @@ -0,0 +1,27 @@ +waitPayload()) { + try { + $counter++; + + if ($counter % 2 === 0) { + $rr->error('test error'); + continue; + } + + sleep(100); + $rr->respond(new RoadRunner\Payload((string)$in->body)); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 39de596..0199ae4 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:stylecheck +package worker_watcher import ( "context" @@ -253,7 +253,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { } ww.RUnlock() // All workers at this moment are in the container - // Pop operation is blocked, push can't be done, since it's not possible to pop + // Pop operation is blocked; push can't be done, since it's not possible to pop ww.Lock() wg := &sync.WaitGroup{} From aff32734973e01620fffb20fc22ac73f7d4aa53b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 13:52:27 +0200 Subject: [PATCH 02/18] chore: update atomoc.Pointer Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 40 ++++++++++++++----------------- pool/static_pool/pool.go | 2 +- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 20af0df..39b37f5 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -21,10 +21,10 @@ type dynAllocator struct { idleTimeout time.Duration // internal - currAllocated atomic.Pointer[uint64] + currAllocated atomic.Uint64 mu *sync.Mutex execLock *sync.RWMutex - started atomic.Pointer[bool] + started atomic.Bool log *zap.Logger // pool ww *worker_watcher.WorkerWatcher @@ -51,8 +51,8 @@ func newDynAllocator( stopCh: stopCh, } - da.currAllocated.Store(p(uint64(0))) - da.started.Store(p(false)) + da.currAllocated.Store(0) + da.started.Store(false) return da } @@ -70,14 +70,14 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { zap.Uint64("max_workers", da.maxWorkers), zap.Uint64("spawn_rate", da.spawnRate)) - if !*da.started.Load() { + if !da.started.Load() { // start the dynamic allocator listener da.dynamicTTLListener() - da.started.Store(p(true)) + da.started.Store(true) } // if we already allocated max workers, we can't allocate more - if *da.currAllocated.Load() == da.maxWorkers { + if da.currAllocated.Load() == da.maxWorkers { // can't allocate more return nil, errors.E(op, fmt.Errorf("can't allocate more workers, increase max_workers option (max_workers limit is %d)", da.maxWorkers), errors.NoFreeWorkers) } @@ -86,7 +86,7 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { // i < da.spawnRate - we can't allocate more workers than the spawn rate for i := uint64(0); i < da.spawnRate; i++ { // spawn as many workers as the user specified in the spawn rate configuration, but not more than max workers - if *da.currAllocated.Load() >= da.maxWorkers { + if da.currAllocated.Load() >= da.maxWorkers { break } @@ -96,8 +96,8 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { } // increase the number of additionally allocated options - _ = da.currAllocated.Swap(p(*da.currAllocated.Load() + 1)) - da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) + _ = da.currAllocated.Swap(da.currAllocated.Load() + 1) + da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) @@ -119,6 +119,7 @@ func (da *dynAllocator) dynamicTTLListener() { goto exit // when this channel is triggered, we should deallocate all dynamically allocated workers case <-triggerTTL.C: + triggerTTL.Stop() da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) // get the Exec (the whole operation) lock da.execLock.Lock() @@ -126,13 +127,13 @@ func (da *dynAllocator) dynamicTTLListener() { da.mu.Lock() // if we don't have any dynamically allocated workers, we can skip the deallocation - if *da.currAllocated.Load() == 0 { + if da.currAllocated.Load() == 0 { da.mu.Unlock() da.execLock.Unlock() goto exit } - alloc := *da.currAllocated.Load() + alloc := da.currAllocated.Load() for range alloc { // take the worker from the stack, inifinite timeout // we should not block here forever @@ -146,26 +147,21 @@ func (da *dynAllocator) dynamicTTLListener() { // potential problem: if we had an error in the da.ww.Take code block, we'd still have the currAllocated > 0 // decrease the number of additionally allocated options - _ = da.currAllocated.Swap(p(*da.currAllocated.Load() - 1)) - da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", *da.currAllocated.Load())) + _ = da.currAllocated.Swap(da.currAllocated.Load() - 1) + da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) } - if *da.currAllocated.Load() != 0 { - da.log.Error("failed to deallocate all dynamically allocated workers", zap.Uint64("remaining", *da.currAllocated.Load())) + if da.currAllocated.Load() != 0 { + da.log.Error("failed to deallocate all dynamically allocated workers", zap.Uint64("remaining", da.currAllocated.Load())) } da.mu.Unlock() da.execLock.Unlock() - triggerTTL.Stop() goto exit } } exit: - da.started.Store(p(false)) + da.started.Store(false) da.log.Debug("dynamic allocator listener exited, all dynamically allocated workers deallocated") }() } - -func p[T any](val T) *T { - return &val -} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 0939bad..3a05ddd 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -381,7 +381,7 @@ func (sp *Pool) NumDynamic() uint64 { return 0 } - return *sp.dynamicAllocator.currAllocated.Load() + return sp.dynamicAllocator.currAllocated.Load() } // Destroy all underlying workers (but let them complete the task). From b957d646a74d0df28a5c123b154d6cec3e5bc5cb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 14:10:42 +0200 Subject: [PATCH 03/18] chore: increase test timeout Signed-off-by: Valery Piashchynski --- Makefile | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index b15d3d1..c581739 100644 --- a/Makefile +++ b/Makefile @@ -7,11 +7,11 @@ SHELL = /bin/sh test_coverage: rm -rf coverage-ci mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt From 45941fa4d2834dc3f9e1db31aa6075131115447e Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 14:46:29 +0200 Subject: [PATCH 04/18] chore: update test with 500w Signed-off-by: Valery Piashchynski --- pool/static_pool/pool_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index 2af0477..fd415fc 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -392,8 +392,9 @@ func Test_DynamicPool_500W(t *testing.T) { wg.Wait() - require.Len(t, p.Workers(), 101) - time.Sleep(time.Second * 20) + time.Sleep(time.Second * 30) + + require.Len(t, p.Workers(), 1) p.Destroy(ctx) } From f0feafff38deb4bef5a4b27dd1bb43b14a2458f3 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 17:21:24 +0200 Subject: [PATCH 05/18] chore: use >= just to be safe Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 39b37f5..65cebb5 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -77,7 +77,7 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { } // if we already allocated max workers, we can't allocate more - if da.currAllocated.Load() == da.maxWorkers { + if da.currAllocated.Load() >= da.maxWorkers { // can't allocate more return nil, errors.E(op, fmt.Errorf("can't allocate more workers, increase max_workers option (max_workers limit is %d)", da.maxWorkers), errors.NoFreeWorkers) } From 413102c277b648def121bfb9de6580c98eee7ac2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 17:30:02 +0200 Subject: [PATCH 06/18] chore: update some atomic operations Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 65cebb5..489a90e 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -96,7 +96,7 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { } // increase the number of additionally allocated options - _ = da.currAllocated.Swap(da.currAllocated.Load() + 1) + da.currAllocated.Add(1) da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) } @@ -147,7 +147,7 @@ func (da *dynAllocator) dynamicTTLListener() { // potential problem: if we had an error in the da.ww.Take code block, we'd still have the currAllocated > 0 // decrease the number of additionally allocated options - _ = da.currAllocated.Swap(da.currAllocated.Load() - 1) + _ = da.currAllocated.Add(^uint64(0)) // subtract 1 da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) } From d294c9c5b3d664a65d59b451fd81f1a55d8d1dcb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 18:48:22 +0200 Subject: [PATCH 07/18] chore: minor update Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 17 +++++++---------- pool/static_pool/pool.go | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 489a90e..32cfa24 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -1,3 +1,6 @@ +// Dynamic allocator for the static pool implementation +// It allocates new workers with batch spawn rate when there are no free workers +// It uses 2 functions: allocateDynamically to allocate new workers and dynamicTTLListener package static_pool import ( @@ -23,7 +26,6 @@ type dynAllocator struct { // internal currAllocated atomic.Uint64 mu *sync.Mutex - execLock *sync.RWMutex started atomic.Bool log *zap.Logger // pool @@ -45,7 +47,6 @@ func newDynAllocator( idleTimeout: cfg.DynamicAllocatorOpts.IdleTimeout, mu: &sync.Mutex{}, ww: ww, - execLock: execLock, allocator: alloc, log: log, stopCh: stopCh, @@ -96,8 +97,8 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { } // increase the number of additionally allocated options - da.currAllocated.Add(1) - da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) + aw := da.currAllocated.Add(1) + da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", aw)) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) @@ -121,15 +122,12 @@ func (da *dynAllocator) dynamicTTLListener() { case <-triggerTTL.C: triggerTTL.Stop() da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) - // get the Exec (the whole operation) lock - da.execLock.Lock() // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated da.mu.Lock() // if we don't have any dynamically allocated workers, we can skip the deallocation if da.currAllocated.Load() == 0 { da.mu.Unlock() - da.execLock.Unlock() goto exit } @@ -147,8 +145,8 @@ func (da *dynAllocator) dynamicTTLListener() { // potential problem: if we had an error in the da.ww.Take code block, we'd still have the currAllocated > 0 // decrease the number of additionally allocated options - _ = da.currAllocated.Add(^uint64(0)) // subtract 1 - da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", da.currAllocated.Load())) + nw := da.currAllocated.Add(^uint64(0)) + da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", nw)) } if da.currAllocated.Load() != 0 { @@ -156,7 +154,6 @@ func (da *dynAllocator) dynamicTTLListener() { } da.mu.Unlock() - da.execLock.Unlock() goto exit } } diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 3a05ddd..5ff9585 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -133,7 +133,7 @@ func (sp *Pool) GetConfig() *pool.Config { return sp.cfg } -// Workers return a worker list associated with the pool. +// Workers returns a worker list associated with the pool. func (sp *Pool) Workers() (workers []*worker.Process) { return sp.ww.List() } From 07b0571968b81d04559e4ec8a397faf11adfafad Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 20 Oct 2025 20:23:02 +0200 Subject: [PATCH 08/18] chore: small chore Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 1 - pool/static_pool/dyn_allocator_test.go | 10 +++++----- pool/static_pool/pool.go | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 32cfa24..c44e24a 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -39,7 +39,6 @@ func newDynAllocator( ww *worker_watcher.WorkerWatcher, alloc func() (*worker.Process, error), stopCh chan struct{}, - execLock *sync.RWMutex, cfg *pool.Config) *dynAllocator { da := &dynAllocator{ maxWorkers: cfg.DynamicAllocatorOpts.MaxWorkers, diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go index daa5f08..af6376d 100644 --- a/pool/static_pool/dyn_allocator_test.go +++ b/pool/static_pool/dyn_allocator_test.go @@ -59,6 +59,7 @@ func Test_DynAllocatorManyReq(t *testing.T) { MaxJobs: 2, AllocateTimeout: time.Second * 1, DestroyTimeout: time.Second * 10, + ResetTimeout: time.Second * 5, DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ MaxWorkers: 25, SpawnRate: 5, @@ -80,9 +81,9 @@ func Test_DynAllocatorManyReq(t *testing.T) { assert.NotNil(t, np) wg := &sync.WaitGroup{} - wg.Add(10000) + wg.Add(1000) go func() { - for range 10000 { + for range 1000 { go func() { defer wg.Done() r, erre := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) @@ -99,14 +100,13 @@ func Test_DynAllocatorManyReq(t *testing.T) { go func() { for range 10 { time.Sleep(time.Second) - errr := np.Reset(context.Background()) - assert.NoError(t, errr) + _ = np.Reset(context.Background()) } }() wg.Wait() - time.Sleep(time.Second * 15) + time.Sleep(time.Second * 30) assert.Equal(t, 5, len(np.Workers())) diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 5ff9585..3f74e0a 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -122,7 +122,7 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p } if p.cfg.DynamicAllocatorOpts != nil { - p.dynamicAllocator = newDynAllocator(p.log, p.ww, p.allocator, p.stopCh, &p.mu, p.cfg) + p.dynamicAllocator = newDynAllocator(p.log, p.ww, p.allocator, p.stopCh, p.cfg) } return p, nil From 456b9eb1ff9a4f6e2da2e459af959bac68c184e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 17 Dec 2025 20:24:48 +0100 Subject: [PATCH 09/18] chore: intermediate commit Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 17 +-- pool/static_pool/pool.go | 2 +- worker_watcher/container/channel/vec.go | 4 +- worker_watcher/worker_watcher.go | 38 ++++--- worker_watcher_v2/channel/vec.go | 139 ++++++++++++++++++++++++ worker_watcher_v2/header.go | 44 ++++++++ worker_watcher_v2/header_test.go | 34 ++++++ 7 files changed, 246 insertions(+), 32 deletions(-) create mode 100644 worker_watcher_v2/channel/vec.go create mode 100644 worker_watcher_v2/header.go create mode 100644 worker_watcher_v2/header_test.go diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index c44e24a..ae2455d 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -5,7 +5,6 @@ package static_pool import ( "context" - "fmt" "sync" "sync/atomic" "time" @@ -57,7 +56,7 @@ func newDynAllocator( return da } -func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { +func (da *dynAllocator) allocateDynamically() { const op = errors.Op("allocate_dynamically") // operation lock @@ -79,7 +78,8 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { // if we already allocated max workers, we can't allocate more if da.currAllocated.Load() >= da.maxWorkers { // can't allocate more - return nil, errors.E(op, fmt.Errorf("can't allocate more workers, increase max_workers option (max_workers limit is %d)", da.maxWorkers), errors.NoFreeWorkers) + da.log.Warn("can't allocate more workers, already allocated max workers", zap.Uint64("max_workers", da.maxWorkers)) + return } // we're starting from the 1 because we already allocated one worker which would be released in the Exec function @@ -92,19 +92,14 @@ func (da *dynAllocator) allocateDynamically() (*worker.Process, error) { err := da.ww.AddWorker() if err != nil { - return nil, errors.E(op, err) + da.log.Error("failed to allocate worker", zap.Error(err)) + continue } // increase the number of additionally allocated options aw := da.currAllocated.Add(1) da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", aw)) } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - w, err := da.ww.Take(ctx) - cancel() - - return w, err } func (da *dynAllocator) dynamicTTLListener() { @@ -132,7 +127,7 @@ func (da *dynAllocator) dynamicTTLListener() { alloc := da.currAllocated.Load() for range alloc { - // take the worker from the stack, inifinite timeout + // take the worker from the stack, infinite timeout // we should not block here forever err := da.ww.RemoveWorker(context.Background()) if err != nil { diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 3f74e0a..0be9622 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -440,7 +440,7 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically // however, requests waiting for the lock won't allocate a new worker and would be failed - return sp.dynamicAllocator.allocateDynamically() + sp.dynamicAllocator.allocateDynamically() } // else if err not nil - return error return nil, errors.E(op, err) diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 8014fbe..6d3ce0b 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -21,11 +21,11 @@ type Vec struct { workers chan *worker.Process } -func NewVector() *Vec { +func NewVector(numWorkers uint64) *Vec { vec := &Vec{ destroy: 0, reset: 0, - workers: make(chan *worker.Process, 600), + workers: make(chan *worker.Process, numWorkers), } return vec diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 0199ae4..010948f 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -25,7 +25,7 @@ type WorkerWatcher struct { // actually don't have a lot of impl here, so interface not needed container *channel.Vec // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers atomic.Uint64 eventBus *events.Bus // map with the worker's pointers @@ -40,16 +40,18 @@ type WorkerWatcher struct { // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { eb, _ := events.NewEventBus() - return &WorkerWatcher{ - container: channel.NewVector(), - log: log, - eventBus: eb, - // pass a ptr to the number of workers to avoid blocking in the TTL loop - numWorkers: numWorkers, + ww := &WorkerWatcher{ + container: channel.NewVector(numWorkers), + log: log, + eventBus: eb, allocateTimeout: allocateTimeout, workers: make(map[int64]*worker.Process, numWorkers), allocator: allocator, } + + // pass a ptr to the number of workers to avoid blocking in the TTL loop + ww.numWorkers.Store(numWorkers) + return ww } func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { @@ -68,7 +70,7 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { } func (ww *WorkerWatcher) AddWorker() error { - if atomic.LoadUint64(&ww.numWorkers) >= maxWorkers { + if ww.numWorkers.Load() >= maxWorkers { return errors.E(errors.WorkerAllocate, errors.Str("container is full, maximum number of workers reached")) } @@ -77,13 +79,13 @@ func (ww *WorkerWatcher) AddWorker() error { return err } - atomic.AddUint64(&ww.numWorkers, 1) + ww.numWorkers.Add(1) return nil } func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { // can't remove the last worker - if atomic.LoadUint64(&ww.numWorkers) == 1 { + if ww.numWorkers.Load() == 1 { ww.log.Warn("can't remove the last worker") return nil } @@ -97,7 +99,7 @@ func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { w.State().Transition(fsm.StateDestroyed) _ = w.Stop() - atomic.AddUint64(&ww.numWorkers, ^uint64(0)) + ww.numWorkers.Add(^uint64(0)) ww.Lock() delete(ww.workers, w.Pid()) ww.Unlock() @@ -176,7 +178,7 @@ func (ww *WorkerWatcher) Allocate() error { select { case <-tt: // reduce the number of workers - atomic.AddUint64(&ww.numWorkers, ^uint64(0)) + ww.numWorkers.Add(^uint64(0)) allocateFreq.Stop() // timeout exceeds, worker can't be allocated return errors.E(op, errors.WorkerAllocate, err) @@ -247,7 +249,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { ww.RLock() // that might be one of the workers is working. To proceed, all workers should be inside a channel - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec + if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec ww.RUnlock() continue } @@ -280,7 +282,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { // todo: rustatian, do we need this mutex? ww.Unlock() - return atomic.LoadUint64(&ww.numWorkers) + return ww.numWorkers.Load() case <-ctx.Done(): // kill workers ww.Lock() @@ -309,7 +311,7 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { ww.container.ResetDone() ww.Unlock() - return atomic.LoadUint64(&ww.numWorkers) + return ww.numWorkers.Load() } } } @@ -329,7 +331,7 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { case <-tt.C: ww.RLock() // that might be one of the workers is working - if atomic.LoadUint64(&ww.numWorkers) != uint64(ww.container.Len()) { //nolint:gosec + if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec ww.RUnlock() continue } @@ -396,7 +398,7 @@ func (ww *WorkerWatcher) List() []*worker.Process { ww.RLock() defer ww.RUnlock() - if atomic.LoadUint64(&ww.numWorkers) == 0 { + if ww.numWorkers.Load() == 0 { return nil } @@ -429,7 +431,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { err = ww.Allocate() if err != nil { ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - if atomic.LoadUint64(&ww.numWorkers) == 0 { + if ww.numWorkers.Load() == 0 { panic("no workers available, can't run the application") } diff --git a/worker_watcher_v2/channel/vec.go b/worker_watcher_v2/channel/vec.go new file mode 100644 index 0000000..6d3ce0b --- /dev/null +++ b/worker_watcher_v2/channel/vec.go @@ -0,0 +1,139 @@ +package channel + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/pool/fsm" + "github.com/roadrunner-server/pool/worker" +) + +type Vec struct { + rwm sync.RWMutex + // destroy signal + destroy uint64 + // reset signal + reset uint64 + // channel with the workers + workers chan *worker.Process +} + +func NewVector(numWorkers uint64) *Vec { + vec := &Vec{ + destroy: 0, + reset: 0, + workers: make(chan *worker.Process, numWorkers), + } + + return vec +} + +// Push is O(1) operation +// In case of TTL and full channel O(n) worst case, where n is len of the channel +func (v *Vec) Push(w *worker.Process) { + // add remove callback + select { + case v.workers <- w: + // default select branch is only possible when dealing with TTL + // because in that case, workers in the v.workers channel can be TTL-ed and killed + // but presenting in the channel + default: + // the channel is full + _ = w.Kill() + } +} + +func (v *Vec) Len() int { + return len(v.workers) +} + +func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { + // remove all workers and return + if atomic.LoadUint64(&v.destroy) == 1 { + return nil, errors.E(errors.WatcherStopped) + } + + // wait for the reset to complete + for atomic.CompareAndSwapUint64(&v.reset, 1, 1) { + select { + case <-ctx.Done(): + default: + time.Sleep(time.Millisecond * 100) + } + } + + // used only for the TTL-ed workers + v.rwm.RLock() + select { + case w := <-v.workers: + v.rwm.RUnlock() + return w, nil + case <-ctx.Done(): + v.rwm.RUnlock() + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } +} + +func (v *Vec) ResetDone() { + atomic.StoreUint64(&v.reset, 0) +} + +func (v *Vec) Reset() { + atomic.StoreUint64(&v.reset, 1) +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} + +func (v *Vec) Remove() { + // Stop Pop operations + v.rwm.Lock() + defer v.rwm.Unlock() + + /* + we can be in the default branch by the following reasons: + 1. TTL is set with no requests during the TTL + 2. Violated Get <-> Release operation (how ??) + */ + + for range len(v.workers) { + /* + We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. + BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers + so, down by the code, we might have a problem when pushing the new worker to the v.workers + */ + wrk := <-v.workers + + switch wrk.State().CurrentState() { + // good states + case fsm.StateWorking, fsm.StateReady: + // put the worker back + // generally, while send and receive operations are concurrent (from the channel), channel behave + // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO + select { + case v.workers <- wrk: + continue + + // all bad states are here + default: + // kill the worker from the channel + wrk.State().Transition(fsm.StateInvalid) + _ = wrk.Kill() + + continue + } + /* + Bad states are here. + */ + default: + // kill the current worker (just to be sure it's dead) + if wrk != nil { + _ = wrk.Kill() + } + } + } +} diff --git a/worker_watcher_v2/header.go b/worker_watcher_v2/header.go new file mode 100644 index 0000000..c4c4b9f --- /dev/null +++ b/worker_watcher_v2/header.go @@ -0,0 +1,44 @@ +package workerwatcherv2 + +import ( + "errors" + "math/bits" + "sync/atomic" + + "github.com/roadrunner-server/pool/worker" +) + +type Header struct { + workers []*worker.Process + bitmask uint64 +} + +func NewHeader(allocWorkers uint) *Header { + if allocWorkers > 64 { + panic("too many workers") + } + + return &Header{ + workers: make([]*worker.Process, allocWorkers), + } +} + +func (h *Header) PopWorker() (*worker.Process, error) { + slot := bits.TrailingZeros64(h.bitmask) + if slot >= len(h.workers) { + return nil, errors.New("no workers") + } + _ = atomic.CompareAndSwapUint64(&h.bitmask, h.bitmask, h.bitmask&(^(1 << slot))) + return h.workers[slot], nil +} + +func (h *Header) PushWorker(w *worker.Process) error { + slot := bits.TrailingZeros64(^h.bitmask) + if len(h.workers) <= slot { + return errors.New("too many workers") + } + + _ = atomic.CompareAndSwapUint64(&h.bitmask, h.bitmask, h.bitmask|(1< Date: Sun, 28 Dec 2025 20:10:16 +0100 Subject: [PATCH 10/18] feature: update dyn allocator Signed-off-by: Valery Piashchynski --- Makefile | 6 +- go.mod | 3 +- go.sum | 4 + pool/config.go | 6 +- pool/ratelimiter/ratelimiter.go | 40 +++ pool/ratelimiter/ratelimiter_test.go | 126 ++++++++ pool/static_pool/dyn_allocator.go | 100 ++++-- pool/static_pool/dyn_allocator_test.go | 215 +++++++++++++ pool/static_pool/pool.go | 3 +- pool/static_pool/pool_test.go | 322 ------------------- tests/composer.json | 7 +- worker_watcher/container/channel/vec.go | 6 +- worker_watcher/container/channel/vec_test.go | 122 +++++++ worker_watcher/worker_watcher.go | 182 +++++------ worker_watcher_v2/channel/vec.go | 139 -------- worker_watcher_v2/header.go | 44 --- worker_watcher_v2/header_test.go | 34 -- 17 files changed, 686 insertions(+), 673 deletions(-) create mode 100644 pool/ratelimiter/ratelimiter.go create mode 100644 pool/ratelimiter/ratelimiter_test.go create mode 100644 worker_watcher/container/channel/vec_test.go delete mode 100644 worker_watcher_v2/channel/vec.go delete mode 100644 worker_watcher_v2/header.go delete mode 100644 worker_watcher_v2/header_test.go diff --git a/Makefile b/Makefile index c581739..f9720c2 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,4 @@ #!/usr/bin/make -# Makefile readme (ru): -# Makefile readme (en): SHELL = /bin/sh @@ -12,6 +10,8 @@ test_coverage: go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/channel.out -covermode=atomic ./worker_watcher/container/channel + go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/ratelimiter.out -covermode=atomic ./pool/ratelimiter echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt @@ -21,4 +21,6 @@ test: ## Run application tests go test -v -race ./pool/static_pool go test -v -race ./worker go test -v -race ./worker_watcher + go test -v -race ./worker_watcher/container/channel + go test -v -race ./pool/ratelimiter go test -v -race -fuzz=FuzzStaticPoolEcho -fuzztime=30s -tags=debug ./pool/static_pool diff --git a/go.mod b/go.mod index 1f19412..e3efa1b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.17.0 + golang.org/x/sync v0.19.0 ) require ( @@ -24,6 +24,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.35.0 // indirect + golang.org/x/time v0.14.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b05ccb9..16a9bbd 100644 --- a/go.sum +++ b/go.sum @@ -42,10 +42,14 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/pool/config.go b/pool/config.go index 46f4d85..4e331f5 100644 --- a/pool/config.go +++ b/pool/config.go @@ -99,9 +99,9 @@ func (d *DynamicAllocationOpts) InitDefaults() { d.MaxWorkers = 10 } - // limit max workers to 100 - if d.MaxWorkers > 100 { - d.MaxWorkers = 100 + // limit max workers to 1000 + if d.MaxWorkers > 1000 { + d.MaxWorkers = 1000 } if d.SpawnRate == 0 { diff --git a/pool/ratelimiter/ratelimiter.go b/pool/ratelimiter/ratelimiter.go new file mode 100644 index 0000000..678bd12 --- /dev/null +++ b/pool/ratelimiter/ratelimiter.go @@ -0,0 +1,40 @@ +package ratelimiter + +import ( + "sync" + "time" +) + +type RateLimiter struct { + mu sync.Mutex + available bool + cooldown time.Duration +} + +func NewRateLimiter(cooldown time.Duration) *RateLimiter { + return &RateLimiter{ + available: true, + cooldown: cooldown, + } +} + +// TryAcquire attempts to take the token. Returns false immediately if unavailable. +func (rl *RateLimiter) TryAcquire() bool { + rl.mu.Lock() + defer rl.mu.Unlock() + + if rl.available { + rl.available = false + return true + } + return false +} + +// Release returns the token after the cooldown period. +func (rl *RateLimiter) Release() { + time.AfterFunc(rl.cooldown, func() { + rl.mu.Lock() + rl.available = true + rl.mu.Unlock() + }) +} diff --git a/pool/ratelimiter/ratelimiter_test.go b/pool/ratelimiter/ratelimiter_test.go new file mode 100644 index 0000000..ab919e8 --- /dev/null +++ b/pool/ratelimiter/ratelimiter_test.go @@ -0,0 +1,126 @@ +package ratelimiter + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRateLimiter_BasicAcquireRelease(t *testing.T) { + rl := NewRateLimiter(1 * time.Second) + + // First acquire should succeed + if !rl.TryAcquire() { + t.Error("expected first TryAcquire to succeed") + } + + // Release and wait for cooldown + rl.Release() + time.Sleep(2 * time.Second) + + // Should be available again + if !rl.TryAcquire() { + t.Error("expected TryAcquire to succeed after cooldown") + } +} + +func TestRateLimiter_CooldownRestoresAvailability(t *testing.T) { + cooldown := 1 * time.Second + rl := NewRateLimiter(cooldown) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Release it + rl.Release() + + // Should not be available immediately + if rl.TryAcquire() { + t.Error("token should not be available immediately after release") + } + + // Wait for cooldown to complete + time.Sleep(cooldown + 1*time.Second) + + // Now it should be available + if !rl.TryAcquire() { + t.Error("token should be available after cooldown period") + } +} + +func TestRateLimiter_ZeroCooldown(t *testing.T) { + rl := NewRateLimiter(0) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Release with zero cooldown + rl.Release() + + // With zero cooldown, AfterFunc fires immediately (or very soon) + time.Sleep(1 * time.Second) + + // Should be available again almost immediately + if !rl.TryAcquire() { + t.Error("token should be available after zero cooldown") + } +} + +func TestRateLimiter_MultipleAcquireWhenUnavailable(t *testing.T) { + rl := NewRateLimiter(5 * time.Second) + + // Acquire the token + if !rl.TryAcquire() { + t.Fatal("initial TryAcquire should succeed") + } + + // Multiple attempts should all fail + for i := range 10 { + if rl.TryAcquire() { + t.Errorf("TryAcquire attempt %d should have failed", i) + } + } +} + +func TestRateLimiter_RaceCondition(t *testing.T) { + rl := NewRateLimiter(1 * time.Second) + + const numGoroutines = 100 + const iterations = 10 + + var successCount atomic.Int64 + var wg sync.WaitGroup + + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + for range iterations { + if rl.TryAcquire() { + successCount.Add(1) + rl.Release() + } + } + }() + } + + wg.Wait() + + if successCount.Load() == 0 { + t.Error("expected at least one successful acquires") + } + + time.Sleep(2 * time.Second) + + // After all goroutines finish and cooldowns complete, token should be available + rl.mu.Lock() + if !rl.available { + t.Error("rate limiter should be in available state after all operations complete") + } + rl.mu.Unlock() +} diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index ae2455d..cecb254 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -9,8 +9,8 @@ import ( "sync/atomic" "time" - "github.com/roadrunner-server/errors" "github.com/roadrunner-server/pool/pool" + "github.com/roadrunner-server/pool/pool/ratelimiter" "github.com/roadrunner-server/pool/worker" "github.com/roadrunner-server/pool/worker_watcher" "go.uber.org/zap" @@ -31,6 +31,10 @@ type dynAllocator struct { ww *worker_watcher.WorkerWatcher allocator func() (*worker.Process, error) stopCh chan struct{} + // the case is, that multiple goroutines can call allocateDynamically at the same time + // and we need to omit some NoFreeWorker calls if one is already in progress within the same time frame + rateLimit *ratelimiter.RateLimiter + lastAllocTry atomic.Pointer[time.Time] } func newDynAllocator( @@ -48,6 +52,7 @@ func newDynAllocator( allocator: alloc, log: log, stopCh: stopCh, + rateLimit: ratelimiter.NewRateLimiter(time.Second), } da.currAllocated.Store(0) @@ -56,11 +61,20 @@ func newDynAllocator( return da } -func (da *dynAllocator) allocateDynamically() { - const op = errors.Op("allocate_dynamically") +func (da *dynAllocator) addMoreWorkers() { + // set the last allocation try time + // we need to store this to prevent immediate deallocation in the TTL listener + da.lastAllocTry.Store(p(time.Now().UTC())) + + if !da.rateLimit.TryAcquire() { + da.log.Warn("rate limit exceeded for dynamic allocation, skipping") + return + } + + // return the token after 1 second + defer da.rateLimit.Release() // operation lock - // we can use a lock-free approach here, but it's not necessary da.mu.Lock() defer da.mu.Unlock() @@ -71,7 +85,7 @@ func (da *dynAllocator) allocateDynamically() { if !da.started.Load() { // start the dynamic allocator listener - da.dynamicTTLListener() + da.startIdleTTLListener() da.started.Store(true) } @@ -100,59 +114,97 @@ func (da *dynAllocator) allocateDynamically() { aw := da.currAllocated.Add(1) da.log.Debug("allocated additional worker", zap.Uint64("currently additionally allocated", aw)) } + + da.log.Debug("currently allocated", zap.Uint64("number", da.currAllocated.Load())) } -func (da *dynAllocator) dynamicTTLListener() { +func (da *dynAllocator) startIdleTTLListener() { da.log.Debug("starting dynamic allocator listener", zap.Duration("idle_timeout", da.idleTimeout)) go func() { // DynamicAllocatorOpts are read-only, so we can use them without a lock triggerTTL := time.NewTicker(da.idleTimeout) + defer triggerTTL.Stop() + for { select { case <-da.stopCh: da.log.Debug("dynamic allocator listener stopped") - goto exit + // Acquire lock before setting started=false to prevent race with addMoreWorkers + da.mu.Lock() + da.started.Store(false) + da.mu.Unlock() + da.log.Debug("dynamic allocator listener exited") + return // when this channel is triggered, we should deallocate all dynamically allocated workers case <-triggerTTL.C: - triggerTTL.Stop() da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) + // check the last allocation time - if we had an allocation recently (within idleTimeout), we should skip deallocation + if da.lastAllocTry.Load() != nil && time.Since(*da.lastAllocTry.Load()) < da.idleTimeout { + da.log.Debug("skipping deallocation of dynamic workers, recent allocation detected") + continue + } + // get the DynamicAllocatorOpts lock to prevent operations on the CurrAllocated da.mu.Lock() // if we don't have any dynamically allocated workers, we can skip the deallocation if da.currAllocated.Load() == 0 { + // Set started=false BEFORE releasing the lock + // This prevents the race condition where addMoreWorkers() sees started=true + // but the listener is about to exit + da.started.Store(false) da.mu.Unlock() - goto exit + da.log.Debug("dynamic allocator listener exited, no workers to deallocate") + return } alloc := da.currAllocated.Load() + da.log.Debug("deallocating dynamically allocated workers", zap.Uint64("to_deallocate", alloc)) + + if alloc >= da.spawnRate { + // deallocate in batches + alloc = da.spawnRate + } + for range alloc { - // take the worker from the stack, infinite timeout - // we should not block here forever - err := da.ww.RemoveWorker(context.Background()) + // Use a context with timeout to prevent indefinite blocking + // The timeout should be reasonable - use idle timeout as a reference + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + err := da.ww.RemoveWorker(ctx) + cancel() + // the only error we can get here is NoFreeWorkers, meaning all workers are busy if err != nil { - da.log.Error("failed to take worker from the stack", zap.Error(err)) - continue + // we should stop deallocation attempts + da.log.Error("failed to remove worker from the pool, stopping deallocation", zap.Error(err)) + // Don't decrement counter if removal failed - worker still exists + break } - // reset the number of allocated workers - // potential problem: if we had an error in the da.ww.Take code block, we'd still have the currAllocated > 0 - - // decrease the number of additionally allocated options + // decrease the number of additionally allocated workers nw := da.currAllocated.Add(^uint64(0)) da.log.Debug("deallocated additional worker", zap.Uint64("currently additionally allocated", nw)) } - if da.currAllocated.Load() != 0 { - da.log.Error("failed to deallocate all dynamically allocated workers", zap.Uint64("remaining", da.currAllocated.Load())) + if da.currAllocated.Load() > 0 { + // if we still have allocated workers, we should keep the listener running + da.mu.Unlock() + da.log.Debug("dynamic allocator listener continuing, still have dynamically allocated workers", zap.Uint64("remaining", da.currAllocated.Load())) + continue } + // CRITICAL FIX: Set started=false BEFORE releasing the lock + // This ensures that any allocateDynamically() call that acquires the lock + // after this point will see started=false and start a new listener + da.started.Store(false) + da.lastAllocTry.Store(nil) da.mu.Unlock() - goto exit + da.log.Debug("dynamic allocator listener exited, all dynamically allocated workers deallocated") + return } } - exit: - da.started.Store(false) - da.log.Debug("dynamic allocator listener exited, all dynamically allocated workers deallocated") }() } + +func p[T any](v T) *T { + return &v +} diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go index af6376d..cbe33cb 100644 --- a/pool/static_pool/dyn_allocator_test.go +++ b/pool/static_pool/dyn_allocator_test.go @@ -112,3 +112,218 @@ func Test_DynAllocatorManyReq(t *testing.T) { np.Destroy(ctx) } + +func Test_DynamicPool_OverMax(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 20, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 1, + IdleTimeout: time.Second * 15, + SpawnRate: 10, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + t.Log("sending request 1") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + t.Log("request 1 finished") + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + go func() { + // sleep to ensure the first request is being processed first + // this request should trigger dynamic allocation attempt and return an error + time.Sleep(time.Second) + t.Log("sending request 2") + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + t.Log("waiting for the requests 1 and 2") + wg.Wait() + t.Log("wait 1 and 2 finished") + + wg.Add(2) + // request 3 and 4 should be processed normally, since we have 2 workers now (1 initial + 1 dynamic) + t.Log("starting requests 3 and 4") + require.Len(t, p.Workers(), 2) + + go func() { + t.Log("request 3") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + t.Log("request 3 finished") + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + go func() { + t.Log("request 4") + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + t.Log("request 4 finished") + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + t.Log("waiting for the requests 3 and 4") + wg.Wait() + time.Sleep(time.Second * 20) + assert.Len(t, p.Workers(), 1) + p.Destroy(ctx) +} + +func Test_DynamicPool(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 5, + IdleTimeout: time.Second * 15, + SpawnRate: 2, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + }() + + go func() { + time.Sleep(time.Second) + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + wg.Wait() + + time.Sleep(time.Second * 20) + require.Len(t, p.Workers(), 1) + + p.Destroy(ctx) +} + +func Test_DynamicPool_500W(t *testing.T) { + dynAllCfg := &pool.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 10, + IdleTimeout: time.Second * 15, + // should be corrected to 10 by RR + SpawnRate: 11, + }, + } + + ctx := context.Background() + p, err := NewPool( + ctx, + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/worker-slow-dyn.php") + }, + pipe.NewPipeFactory(log()), + dynAllCfg, + log(), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + require.Len(t, p.Workers(), 1) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + select { + case resp := <-r: + assert.Equal(t, []byte("hello world"), resp.Body()) + assert.NoError(t, err) + case <-time.After(time.Second * 10): + assert.Fail(t, "timeout") + } + + wg.Done() + }() + + go func() { + time.Sleep(time.Second * 1) + _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.Error(t, err) + wg.Done() + }() + + wg.Wait() + + time.Sleep(time.Second * 30) + + require.Len(t, p.Workers(), 1) + p.Destroy(ctx) +} diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 0be9622..5789dd2 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -439,8 +439,7 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr // for the dynamic allocator, we would have many requests waiting at the same time on the lock in the dyn allocator // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically // however, requests waiting for the lock won't allocate a new worker and would be failed - - sp.dynamicAllocator.allocateDynamically() + sp.dynamicAllocator.addMoreWorkers() } // else if err not nil - return error return nil, errors.E(op, err) diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index fd415fc..477115a 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -55,262 +55,6 @@ func Test_NewPool(t *testing.T) { p.Destroy(ctx) } -func Test_DynamicPool_OverMax(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 20, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 1, - IdleTimeout: time.Second * 15, - SpawnRate: 10, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - wg.Add(2) - time.Sleep(time.Second * 20) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - time.Sleep(time.Second * 20) - - assert.Len(t, p.Workers(), 1) - - p.Destroy(ctx) -} - -func Test_DynamicPool_SeveralCalls(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 20, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 5, - IdleTimeout: time.Second * 15, - SpawnRate: 2, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - assert.Equal(t, uint64(2), p.NumDynamic()) - - wg.Add(2) - time.Sleep(time.Second * 20) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - assert.Equal(t, uint64(2), p.NumDynamic()) - time.Sleep(time.Second * 20) - - assert.Equal(t, uint64(0), p.NumDynamic()) - assert.Len(t, p.Workers(), 1) - - p.Destroy(ctx) -} - -func Test_DynamicPool(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 5, - IdleTimeout: time.Second * 15, - SpawnRate: 2, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - require.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - time.Sleep(time.Second * 20) - - p.Destroy(ctx) -} - func Test_MaxWorkers(t *testing.T) { dynAllCfg := &pool.Config{ NumWorkers: 501, @@ -332,72 +76,6 @@ func Test_MaxWorkers(t *testing.T) { assert.Nil(t, p) } -func Test_DynamicPool_500W(t *testing.T) { - dynAllCfg := &pool.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second, - DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ - MaxWorkers: 100, - IdleTimeout: time.Second * 15, - // should be corrected to 100 by RR - SpawnRate: 101, - }, - } - - ctx := context.Background() - p, err := NewPool( - ctx, - func(cmd []string) *exec.Cmd { - return exec.Command("php", "../../tests/worker-slow-dyn.php") - }, - pipe.NewPipeFactory(log()), - dynAllCfg, - log(), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - require.Len(t, p.Workers(), 1) - - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - assert.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - assert.NoError(t, err) - select { - case resp := <-r: - assert.Equal(t, []byte("hello world"), resp.Body()) - assert.NoError(t, err) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - wg.Done() - }() - - wg.Wait() - - time.Sleep(time.Second * 30) - - require.Len(t, p.Workers(), 1) - p.Destroy(ctx) -} - func Test_NewPoolAddRemoveWorkers(t *testing.T) { testCfg2 := &pool.Config{ NumWorkers: 1, diff --git a/tests/composer.json b/tests/composer.json index 5aecd89..2e14a43 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -3,12 +3,11 @@ "prefer-stable": true, "require": { "nyholm/psr7": "^1.4", - "spiral/roadrunner-http": "^3.5", - "spiral/roadrunner-worker": "^3.5", - "temporal/sdk": ">=1.0", + "spiral/roadrunner-http": "^3.0", + "spiral/roadrunner-worker": "^3.0", "spiral/tokenizer": ">=2.7", "spiral/goridge": "^4.0", - "spiral/roadrunner-metrics": "^2.0" + "spiral/roadrunner-metrics": "^3.0" }, "autoload": { "psr-4": { diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 6d3ce0b..24adb98 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -21,11 +21,11 @@ type Vec struct { workers chan *worker.Process } -func NewVector(numWorkers uint64) *Vec { +func NewVector() *Vec { vec := &Vec{ destroy: 0, reset: 0, - workers: make(chan *worker.Process, numWorkers), + workers: make(chan *worker.Process, 2048), } return vec @@ -61,7 +61,7 @@ func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { select { case <-ctx.Done(): default: - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 10) } } diff --git a/worker_watcher/container/channel/vec_test.go b/worker_watcher/container/channel/vec_test.go new file mode 100644 index 0000000..d4856c9 --- /dev/null +++ b/worker_watcher/container/channel/vec_test.go @@ -0,0 +1,122 @@ +package channel + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/pool/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// helper to create a simple worker for testing +func createTestWorker(t *testing.T) *worker.Process { + t.Helper() + cmd := exec.Command("php", "-v") + w, err := worker.InitBaseWorker(cmd) + require.NoError(t, err) + return w +} + +// ==================== General Tests ==================== + +// Test_Vec_PushPop tests basic push and pop operations +func Test_Vec_PushPop(t *testing.T) { + vec := NewVector() + w := createTestWorker(t) + + // Push worker + vec.Push(w) + assert.Equal(t, 1, vec.Len()) + + // Pop worker + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + popped, err := vec.Pop(ctx) + require.NoError(t, err) + assert.Equal(t, w, popped) + assert.Equal(t, 0, vec.Len()) +} + +// Test_Vec_Len tests that Len returns correct count +func Test_Vec_Len(t *testing.T) { + vec := NewVector() + + assert.Equal(t, 0, vec.Len()) + + // Push multiple workers + for range 5 { + w := createTestWorker(t) + vec.Push(w) + } + + assert.Equal(t, 5, vec.Len()) + + // Pop one + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, err := vec.Pop(ctx) + require.NoError(t, err) + + assert.Equal(t, 4, vec.Len()) +} + +// ==================== Edge Case Tests ==================== + +// Test_Vec_PopAfterDestroy tests that Pop returns WatcherStopped error after Destroy is called +func Test_Vec_PopAfterDestroy(t *testing.T) { + vec := NewVector() + w := createTestWorker(t) + vec.Push(w) + + // Destroy the vector + vec.Destroy() + + // Pop should fail immediately with WatcherStopped + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + popped, err := vec.Pop(ctx) + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.WatcherStopped, err)) +} + +// Test_Vec_PopWithCanceledContext tests that Pop returns error when context is canceled +func Test_Vec_PopWithCanceledContext(t *testing.T) { + vec := NewVector() + // Don't push any workers - channel is empty + + // Create an already canceled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + popped, err := vec.Pop(ctx) + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.NoFreeWorkers, err)) +} + +// Test_Vec_PopContextTimeout tests that Pop returns error when context times out waiting for worker +func Test_Vec_PopContextTimeout(t *testing.T) { + vec := NewVector() + // Empty channel - no workers to pop + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + popped, err := vec.Pop(ctx) + elapsed := time.Since(start) + + assert.Nil(t, popped) + require.Error(t, err) + assert.True(t, errors.Is(errors.NoFreeWorkers, err)) + // Verify it actually waited for the timeout + assert.True(t, elapsed >= 50*time.Millisecond, "should have waited for timeout") +} diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 010948f..fc28226 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -21,7 +21,7 @@ const maxWorkers = 500 type Allocator func() (*worker.Process, error) type WorkerWatcher struct { - sync.RWMutex + mu sync.RWMutex // actually don't have a lot of impl here, so interface not needed container *channel.Vec // used to control Destroy stage (that all workers are in the container) @@ -29,7 +29,8 @@ type WorkerWatcher struct { eventBus *events.Bus // map with the worker's pointers - workers map[int64]*worker.Process + workers sync.Map + // workers map[int64]*worker.Process log *zap.Logger @@ -41,11 +42,11 @@ type WorkerWatcher struct { func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *WorkerWatcher { eb, _ := events.NewEventBus() ww := &WorkerWatcher{ - container: channel.NewVector(numWorkers), + container: channel.NewVector(), log: log, eventBus: eb, allocateTimeout: allocateTimeout, - workers: make(map[int64]*worker.Process, numWorkers), + workers: sync.Map{}, allocator: allocator, } @@ -55,14 +56,14 @@ func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint6 } func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { - ww.Lock() - defer ww.Unlock() + ww.mu.Lock() + defer ww.mu.Unlock() // else we can add all workers for i := range workers { ww.container.Push(workers[i]) // add worker to watch slice - ww.workers[workers[i].Pid()] = workers[i] + ww.workers.Store(workers[i].Pid(), workers[i]) ww.addToWatch(workers[i]) } @@ -70,6 +71,9 @@ func (ww *WorkerWatcher) Watch(workers []*worker.Process) error { } func (ww *WorkerWatcher) AddWorker() error { + ww.mu.Lock() + defer ww.mu.Unlock() + if ww.numWorkers.Load() >= maxWorkers { return errors.E(errors.WorkerAllocate, errors.Str("container is full, maximum number of workers reached")) } @@ -84,6 +88,9 @@ func (ww *WorkerWatcher) AddWorker() error { } func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { + ww.mu.Lock() + defer ww.mu.Unlock() + // can't remove the last worker if ww.numWorkers.Load() == 1 { ww.log.Warn("can't remove the last worker") @@ -100,9 +107,7 @@ func (ww *WorkerWatcher) RemoveWorker(ctx context.Context) error { _ = w.Stop() ww.numWorkers.Add(^uint64(0)) - ww.Lock() - delete(ww.workers, w.Pid()) - ww.Unlock() + ww.workers.Delete(w.Pid()) return nil } @@ -202,9 +207,12 @@ done: // add worker to Wait ww.addToWatch(sw) // add a new worker to the worker's slice (to get information about workers in parallel) - ww.Lock() - ww.workers[sw.Pid()] = sw - ww.Unlock() + if w, ok := ww.workers.LoadAndDelete(sw.Pid()); ok { + ww.log.Warn("allocated worker already exists, killing duplicate, report this case", zap.Int64("pid", sw.Pid())) + _ = w.(*worker.Process).Kill() + } + + ww.workers.Store(sw.Pid(), sw) // push the worker to the container ww.Release(sw) return nil @@ -246,70 +254,64 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { for { select { case <-tt.C: - ww.RLock() + ww.mu.RLock() // that might be one of the workers is working. To proceed, all workers should be inside a channel if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec - ww.RUnlock() + ww.mu.RUnlock() continue } - ww.RUnlock() + ww.mu.RUnlock() // All workers at this moment are in the container // Pop operation is blocked; push can't be done, since it's not possible to pop - ww.Lock() + ww.mu.Lock() wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } - wg.Wait() + w.Callback() + }) - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } + ww.workers.Delete(key) + return true + }) + + wg.Wait() ww.container.ResetDone() // todo: rustatian, do we need this mutex? - ww.Unlock() + ww.mu.Unlock() return ww.numWorkers.Load() case <-ctx.Done(): // kill workers - ww.Lock() + ww.mu.Lock() // drain workers slice wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) - // stop the worker - _ = v.Stop() + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) + // kill the worker + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } + w.Callback() + }) - wg.Wait() - - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } + ww.workers.Delete(key) + return true + }) + wg.Wait() ww.container.ResetDone() - ww.Unlock() + ww.mu.Unlock() return ww.numWorkers.Load() } @@ -318,10 +320,10 @@ func (ww *WorkerWatcher) Reset(ctx context.Context) uint64 { // Destroy all underlying containers (but let them complete the task) func (ww *WorkerWatcher) Destroy(ctx context.Context) { - ww.Lock() + ww.mu.Lock() // do not release new workers ww.container.Destroy() - ww.Unlock() + ww.mu.Unlock() tt := time.NewTicker(time.Second * 1) // destroy container; we don't use ww mutex here, since we should be able to push worker @@ -329,65 +331,59 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: - ww.RLock() + ww.mu.RLock() // that might be one of the workers is working if ww.numWorkers.Load() != uint64(ww.container.Len()) { //nolint:gosec - ww.RUnlock() + ww.mu.RUnlock() continue } - ww.RUnlock() + ww.mu.RUnlock() // All workers at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.Lock() + ww.mu.Lock() // drain a channel, this operation will not actually pop, only drain a channel _, _ = ww.container.Pop(ctx) wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() - }(pid, w) - } + _ = w.Stop() + // remove worker from the channel + w.Callback() + }) + ww.workers.Delete(key) + return true + }) wg.Wait() - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } - - ww.Unlock() + ww.mu.Unlock() return case <-ctx.Done(): // kill workers - ww.Lock() + ww.mu.Lock() wg := &sync.WaitGroup{} - for pid, w := range ww.workers { - wg.Add(1) - go func(k int64, v *worker.Process) { - defer wg.Done() - v.State().Transition(fsm.StateDestroyed) + ww.workers.Range(func(key, value any) bool { + w := value.(*worker.Process) + wg.Go(func() { + w.State().Transition(fsm.StateDestroyed) // kill the worker - _ = v.Stop() + _ = w.Stop() // remove worker from the channel - v.Callback() - }(pid, w) - } + w.Callback() + }) + ww.workers.Delete(key) + return true + }) wg.Wait() - // one operation - for k := range ww.workers { - delete(ww.workers, k) - } - - ww.Unlock() + ww.mu.Unlock() return } } @@ -395,18 +391,16 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { // List - this is O(n) operation, and it will return copy of the actual workers func (ww *WorkerWatcher) List() []*worker.Process { - ww.RLock() - defer ww.RUnlock() - if ww.numWorkers.Load() == 0 { return nil } base := make([]*worker.Process, 0, 2) - for _, w := range ww.workers { - base = append(base, w) - } + ww.workers.Range(func(key, value any) bool { + base = append(base, value.(*worker.Process)) + return true + }) return base } @@ -418,9 +412,7 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { } // remove worker - ww.Lock() - delete(ww.workers, w.Pid()) - ww.Unlock() + ww.workers.Delete(w.Pid()) if w.State().Compare(fsm.StateDestroyed) { // worker was manually destroyed, no need to replace diff --git a/worker_watcher_v2/channel/vec.go b/worker_watcher_v2/channel/vec.go deleted file mode 100644 index 6d3ce0b..0000000 --- a/worker_watcher_v2/channel/vec.go +++ /dev/null @@ -1,139 +0,0 @@ -package channel - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/roadrunner-server/errors" - "github.com/roadrunner-server/pool/fsm" - "github.com/roadrunner-server/pool/worker" -) - -type Vec struct { - rwm sync.RWMutex - // destroy signal - destroy uint64 - // reset signal - reset uint64 - // channel with the workers - workers chan *worker.Process -} - -func NewVector(numWorkers uint64) *Vec { - vec := &Vec{ - destroy: 0, - reset: 0, - workers: make(chan *worker.Process, numWorkers), - } - - return vec -} - -// Push is O(1) operation -// In case of TTL and full channel O(n) worst case, where n is len of the channel -func (v *Vec) Push(w *worker.Process) { - // add remove callback - select { - case v.workers <- w: - // default select branch is only possible when dealing with TTL - // because in that case, workers in the v.workers channel can be TTL-ed and killed - // but presenting in the channel - default: - // the channel is full - _ = w.Kill() - } -} - -func (v *Vec) Len() int { - return len(v.workers) -} - -func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { - // remove all workers and return - if atomic.LoadUint64(&v.destroy) == 1 { - return nil, errors.E(errors.WatcherStopped) - } - - // wait for the reset to complete - for atomic.CompareAndSwapUint64(&v.reset, 1, 1) { - select { - case <-ctx.Done(): - default: - time.Sleep(time.Millisecond * 100) - } - } - - // used only for the TTL-ed workers - v.rwm.RLock() - select { - case w := <-v.workers: - v.rwm.RUnlock() - return w, nil - case <-ctx.Done(): - v.rwm.RUnlock() - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } -} - -func (v *Vec) ResetDone() { - atomic.StoreUint64(&v.reset, 0) -} - -func (v *Vec) Reset() { - atomic.StoreUint64(&v.reset, 1) -} - -func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) -} - -func (v *Vec) Remove() { - // Stop Pop operations - v.rwm.Lock() - defer v.rwm.Unlock() - - /* - we can be in the default branch by the following reasons: - 1. TTL is set with no requests during the TTL - 2. Violated Get <-> Release operation (how ??) - */ - - for range len(v.workers) { - /* - We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. - BUT while we are draining the vector, some worker might be reallocated and pushed into the v.workers - so, down by the code, we might have a problem when pushing the new worker to the v.workers - */ - wrk := <-v.workers - - switch wrk.State().CurrentState() { - // good states - case fsm.StateWorking, fsm.StateReady: - // put the worker back - // generally, while send and receive operations are concurrent (from the channel), channel behave - // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO - select { - case v.workers <- wrk: - continue - - // all bad states are here - default: - // kill the worker from the channel - wrk.State().Transition(fsm.StateInvalid) - _ = wrk.Kill() - - continue - } - /* - Bad states are here. - */ - default: - // kill the current worker (just to be sure it's dead) - if wrk != nil { - _ = wrk.Kill() - } - } - } -} diff --git a/worker_watcher_v2/header.go b/worker_watcher_v2/header.go deleted file mode 100644 index c4c4b9f..0000000 --- a/worker_watcher_v2/header.go +++ /dev/null @@ -1,44 +0,0 @@ -package workerwatcherv2 - -import ( - "errors" - "math/bits" - "sync/atomic" - - "github.com/roadrunner-server/pool/worker" -) - -type Header struct { - workers []*worker.Process - bitmask uint64 -} - -func NewHeader(allocWorkers uint) *Header { - if allocWorkers > 64 { - panic("too many workers") - } - - return &Header{ - workers: make([]*worker.Process, allocWorkers), - } -} - -func (h *Header) PopWorker() (*worker.Process, error) { - slot := bits.TrailingZeros64(h.bitmask) - if slot >= len(h.workers) { - return nil, errors.New("no workers") - } - _ = atomic.CompareAndSwapUint64(&h.bitmask, h.bitmask, h.bitmask&(^(1 << slot))) - return h.workers[slot], nil -} - -func (h *Header) PushWorker(w *worker.Process) error { - slot := bits.TrailingZeros64(^h.bitmask) - if len(h.workers) <= slot { - return errors.New("too many workers") - } - - _ = atomic.CompareAndSwapUint64(&h.bitmask, h.bitmask, h.bitmask|(1< Date: Sun, 28 Dec 2025 20:25:01 +0100 Subject: [PATCH 11/18] Update pool/static_pool/dyn_allocator.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pool/static_pool/dyn_allocator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index cecb254..a6969b2 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -1,6 +1,6 @@ // Dynamic allocator for the static pool implementation // It allocates new workers with batch spawn rate when there are no free workers -// It uses 2 functions: allocateDynamically to allocate new workers and dynamicTTLListener +// It uses 2 functions: addMoreWorkers to allocate new workers and dynamicTTLListener package static_pool import ( From 2f659cbb95f822ec9c353c50d65c609ed2611bac Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:27:23 +0100 Subject: [PATCH 12/18] chore: implement review suggestions Signed-off-by: Valery Piashchynski --- pool/config.go | 12 +++++++----- pool/static_pool/dyn_allocator.go | 6 +++--- pool/static_pool/pool.go | 2 +- worker_watcher/container/channel/vec.go | 1 + worker_watcher/worker_watcher.go | 2 +- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pool/config.go b/pool/config.go index 4e331f5..bdcb536 100644 --- a/pool/config.go +++ b/pool/config.go @@ -5,6 +5,8 @@ import ( "time" ) +const maxWorkers = 2048 + // Config .. Pool config Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. @@ -64,7 +66,7 @@ func (cfg *Config) InitDefaults() { // initialize the dynamic allocator if cfg.DynamicAllocatorOpts != nil { - cfg.DynamicAllocatorOpts.InitDefaults() + cfg.DynamicAllocatorOpts.InitDefaults(cfg.NumWorkers) } } @@ -94,14 +96,14 @@ type DynamicAllocationOpts struct { IdleTimeout time.Duration `mapstructure:"idle_timeout"` } -func (d *DynamicAllocationOpts) InitDefaults() { +func (d *DynamicAllocationOpts) InitDefaults(numWorkers uint64) { if d.MaxWorkers == 0 { d.MaxWorkers = 10 } - // limit max workers to 1000 - if d.MaxWorkers > 1000 { - d.MaxWorkers = 1000 + // limit max workers to 1000 dynamically allocated workers + if d.MaxWorkers+numWorkers > maxWorkers { + d.MaxWorkers = maxWorkers - numWorkers } if d.SpawnRate == 0 { diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index cecb254..31084a2 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -1,6 +1,6 @@ // Dynamic allocator for the static pool implementation // It allocates new workers with batch spawn rate when there are no free workers -// It uses 2 functions: allocateDynamically to allocate new workers and dynamicTTLListener +// It uses 2 functions: addMoreWorkers to allocate new workers and startIdleTTLListener package static_pool import ( @@ -31,7 +31,7 @@ type dynAllocator struct { ww *worker_watcher.WorkerWatcher allocator func() (*worker.Process, error) stopCh chan struct{} - // the case is, that multiple goroutines can call allocateDynamically at the same time + // the case is, that multiple goroutines can call addMoreWorkers at the same time // and we need to omit some NoFreeWorker calls if one is already in progress within the same time frame rateLimit *ratelimiter.RateLimiter lastAllocTry atomic.Pointer[time.Time] @@ -193,7 +193,7 @@ func (da *dynAllocator) startIdleTTLListener() { } // CRITICAL FIX: Set started=false BEFORE releasing the lock - // This ensures that any allocateDynamically() call that acquires the lock + // This ensures that any addMoreWorkers() call that acquires the lock // after this point will see started=false and start a new listener da.started.Store(false) da.lastAllocTry.Store(nil) diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index 5789dd2..be47a16 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -437,7 +437,7 @@ func (sp *Pool) takeWorker(ctxGetFree context.Context, op errors.Op) (*worker.Pr } // for the dynamic allocator, we would have many requests waiting at the same time on the lock in the dyn allocator - // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the allocateDynamically + // this will lead to the following case - all previous requests would be able to get the worker, since we're allocating them in the addMoreWorkers // however, requests waiting for the lock won't allocate a new worker and would be failed sp.dynamicAllocator.addMoreWorkers() } diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 24adb98..5475c8b 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -25,6 +25,7 @@ func NewVector() *Vec { vec := &Vec{ destroy: 0, reset: 0, + // currently, we can have up to 2048 workers in the pool workers: make(chan *worker.Process, 2048), } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index fc28226..adfba4c 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" ) -const maxWorkers = 500 +const maxWorkers = 2048 // Allocator is responsible for worker allocation in the pool type Allocator func() (*worker.Process, error) From 631aed65e6325101cfdf35c7fdb3fac0717bcda5 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:29:32 +0100 Subject: [PATCH 13/18] chore: more review suggestions Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator_test.go | 4 ++-- tests/composer.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go index cbe33cb..c07a74e 100644 --- a/pool/static_pool/dyn_allocator_test.go +++ b/pool/static_pool/dyn_allocator_test.go @@ -27,7 +27,7 @@ var testDynCfg = &pool.Config{ DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ MaxWorkers: 25, SpawnRate: 5, - IdleTimeout: 10, + IdleTimeout: time.Second * 10, }, } @@ -92,7 +92,7 @@ func Test_DynAllocatorManyReq(t *testing.T) { return } resp := <-r - require.Equal(t, []byte("hello"), resp.Body()) + assert.Equal(t, []byte("hello"), resp.Body()) }() } }() diff --git a/tests/composer.json b/tests/composer.json index 2e14a43..844ade6 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -3,8 +3,8 @@ "prefer-stable": true, "require": { "nyholm/psr7": "^1.4", - "spiral/roadrunner-http": "^3.0", - "spiral/roadrunner-worker": "^3.0", + "spiral/roadrunner-http": "^3.5", + "spiral/roadrunner-worker": "^3.5", "spiral/tokenizer": ">=2.7", "spiral/goridge": "^4.0", "spiral/roadrunner-metrics": "^3.0" From 4b8d5cca04df163057e084e337bec6b0bb0ad3d9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:31:42 +0100 Subject: [PATCH 14/18] chore: review Signed-off-by: Valery Piashchynski --- pool/static_pool/dyn_allocator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index 31084a2..d344541 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -139,7 +139,8 @@ func (da *dynAllocator) startIdleTTLListener() { case <-triggerTTL.C: da.log.Debug("dynamic workers TTL", zap.String("reason", "idle timeout reached")) // check the last allocation time - if we had an allocation recently (within idleTimeout), we should skip deallocation - if da.lastAllocTry.Load() != nil && time.Since(*da.lastAllocTry.Load()) < da.idleTimeout { + lastAlloc := da.lastAllocTry.Load() + if lastAlloc != nil && time.Since(*lastAlloc) < da.idleTimeout { da.log.Debug("skipping deallocation of dynamic workers, recent allocation detected") continue } From 231b3995b1a4e018e3110bb8ea214da7f0104a25 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:32:53 +0100 Subject: [PATCH 15/18] chore: update deps Signed-off-by: Valery Piashchynski --- go.mod | 9 ++++----- go.sum | 12 ++++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index e3efa1b..5bbda70 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/roadrunner-server/goridge/v3 v3.8.3 github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.11.1 - go.uber.org/zap v1.27.0 + go.uber.org/zap v1.27.1 golang.org/x/sync v0.19.0 ) @@ -19,12 +19,11 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect - github.com/tklauser/go-sysconf v0.3.15 // indirect - github.com/tklauser/numcpus v0.10.0 // indirect + github.com/tklauser/go-sysconf v0.3.16 // indirect + github.com/tklauser/numcpus v0.11.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.35.0 // indirect - golang.org/x/time v0.14.0 // indirect + golang.org/x/sys v0.39.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 16a9bbd..6a1b1a9 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= +github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= +github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= +github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= +github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -40,16 +44,16 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= -golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= From 630a683f5974749d37b4a20aeb71c343e7e5b04f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:39:57 +0100 Subject: [PATCH 16/18] chore: update actions Signed-off-by: Valery Piashchynski --- .github/workflows/tests.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ee4c571..5c8f92a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -50,6 +50,10 @@ jobs: run: make test_coverage shell: bash + - name: Run golang tests with coverage on macOS + if: runner.os == 'macOS' + run: make test_coverage + - uses: codecov/codecov-action@v5 # Docs: with: files: ./coverage-ci/summary.txt From bddc55af25cdfd2a78bb625330512b8d6cb8fb2a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 20:50:14 +0100 Subject: [PATCH 17/18] chore: remove windows from gha Signed-off-by: Valery Piashchynski --- .github/workflows/tests.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5c8f92a..4644c2c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,7 @@ jobs: matrix: php: ["8.4"] go: [stable] - os: ["ubuntu-latest", "macos-latest", "windows-latest"] + os: ["ubuntu-latest", "macos-latest"] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v6 # action page: @@ -45,11 +45,6 @@ jobs: if: runner.os == 'Linux' run: make test_coverage - - name: Run golang tests with coverage on Windows - if: runner.os == 'Windows' - run: make test_coverage - shell: bash - - name: Run golang tests with coverage on macOS if: runner.os == 'macOS' run: make test_coverage From 9d4af100ab82c5c0b75db419d571210a52d4306f Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 28 Dec 2025 21:27:04 +0100 Subject: [PATCH 18/18] chore: update gha Signed-off-by: Valery Piashchynski --- .github/workflows/tests.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4644c2c..56f3381 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,7 @@ jobs: matrix: php: ["8.4"] go: [stable] - os: ["ubuntu-latest", "macos-latest"] + os: ["ubuntu-latest"] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v6 # action page: @@ -42,11 +42,6 @@ jobs: run: go mod download - name: Run golang tests with coverage on Linux - if: runner.os == 'Linux' - run: make test_coverage - - - name: Run golang tests with coverage on macOS - if: runner.os == 'macOS' run: make test_coverage - uses: codecov/codecov-action@v5 # Docs: