Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
php: ["8.4"]
go: [stable]
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v6 # action page: <https://github.com/actions/setup-go>
Expand Down Expand Up @@ -42,14 +42,8 @@ jobs:
run: go mod download

- name: Run golang tests with coverage on Linux
if: runner.os == 'Linux'
run: make test_coverage

- name: Run golang tests with coverage on Windows
if: runner.os == 'Windows'
run: make test_coverage
shell: bash

- uses: codecov/codecov-action@v5 # Docs: <https://github.com/codecov/codecov-action>
with:
files: ./coverage-ci/summary.txt
Expand Down
16 changes: 9 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#!/usr/bin/make
# Makefile readme (ru): <http://linux.yaroslavl.ru/docs/prog/gnu_make_3-79_russian_manual.html>
# Makefile readme (en): <https://www.gnu.org/software/make/manual/html_node/index.html#SEC_Contents>

SHELL = /bin/sh

test_coverage:
rm -rf coverage-ci
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/pool_static.out -covermode=atomic ./pool/static_pool
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/channel.out -covermode=atomic ./worker_watcher/container/channel
go test -v -race -cover -tags=debug -timeout 30m -coverpkg=./... -coverprofile=./coverage-ci/ratelimiter.out -covermode=atomic ./pool/ratelimiter
echo 'mode: atomic' > ./coverage-ci/summary.txt
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt

Expand All @@ -21,4 +21,6 @@ test: ## Run application tests
go test -v -race ./pool/static_pool
go test -v -race ./worker
go test -v -race ./worker_watcher
go test -v -race ./worker_watcher/container/channel
go test -v -race ./pool/ratelimiter
go test -v -race -fuzz=FuzzStaticPoolEcho -fuzztime=30s -tags=debug ./pool/static_pool
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/roadrunner-server/pool

go 1.24.0
go 1.25

require (
github.com/roadrunner-server/errors v1.4.1
Expand All @@ -19,11 +19,11 @@ require (
github.com/kr/pretty v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.15 // indirect
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/sys v0.39.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=
github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=
github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA=
github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI=
github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso=
github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ=
github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw=
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
Expand All @@ -46,6 +52,8 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
Expand Down
12 changes: 7 additions & 5 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const maxWorkers = 2048

// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Expand Down Expand Up @@ -64,7 +66,7 @@ func (cfg *Config) InitDefaults() {

// initialize the dynamic allocator
if cfg.DynamicAllocatorOpts != nil {
cfg.DynamicAllocatorOpts.InitDefaults()
cfg.DynamicAllocatorOpts.InitDefaults(cfg.NumWorkers)
}
}

Expand Down Expand Up @@ -94,14 +96,14 @@ type DynamicAllocationOpts struct {
IdleTimeout time.Duration `mapstructure:"idle_timeout"`
}

func (d *DynamicAllocationOpts) InitDefaults() {
func (d *DynamicAllocationOpts) InitDefaults(numWorkers uint64) {
if d.MaxWorkers == 0 {
d.MaxWorkers = 10
}

// limit max workers to 100
if d.MaxWorkers > 100 {
d.MaxWorkers = 100
// limit max workers to 1000 dynamically allocated workers
if d.MaxWorkers+numWorkers > maxWorkers {
d.MaxWorkers = maxWorkers - numWorkers
}

if d.SpawnRate == 0 {
Expand Down
40 changes: 40 additions & 0 deletions pool/ratelimiter/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ratelimiter

import (
"sync"
"time"
)

type RateLimiter struct {
mu sync.Mutex
available bool
cooldown time.Duration
}

func NewRateLimiter(cooldown time.Duration) *RateLimiter {
return &RateLimiter{
available: true,
cooldown: cooldown,
}
}

// TryAcquire attempts to take the token. Returns false immediately if unavailable.
func (rl *RateLimiter) TryAcquire() bool {
rl.mu.Lock()
defer rl.mu.Unlock()

if rl.available {
rl.available = false
return true
}
return false
}

// Release returns the token after the cooldown period.
func (rl *RateLimiter) Release() {
time.AfterFunc(rl.cooldown, func() {
rl.mu.Lock()
rl.available = true
rl.mu.Unlock()
})
}
126 changes: 126 additions & 0 deletions pool/ratelimiter/ratelimiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package ratelimiter

import (
"sync"
"sync/atomic"
"testing"
"time"
)

func TestRateLimiter_BasicAcquireRelease(t *testing.T) {
rl := NewRateLimiter(1 * time.Second)

// First acquire should succeed
if !rl.TryAcquire() {
t.Error("expected first TryAcquire to succeed")
}

// Release and wait for cooldown
rl.Release()
time.Sleep(2 * time.Second)

// Should be available again
if !rl.TryAcquire() {
t.Error("expected TryAcquire to succeed after cooldown")
}
}

func TestRateLimiter_CooldownRestoresAvailability(t *testing.T) {
cooldown := 1 * time.Second
rl := NewRateLimiter(cooldown)

// Acquire the token
if !rl.TryAcquire() {
t.Fatal("initial TryAcquire should succeed")
}

// Release it
rl.Release()

// Should not be available immediately
if rl.TryAcquire() {
t.Error("token should not be available immediately after release")
}

// Wait for cooldown to complete
time.Sleep(cooldown + 1*time.Second)

// Now it should be available
if !rl.TryAcquire() {
t.Error("token should be available after cooldown period")
}
}

func TestRateLimiter_ZeroCooldown(t *testing.T) {
rl := NewRateLimiter(0)

// Acquire the token
if !rl.TryAcquire() {
t.Fatal("initial TryAcquire should succeed")
}

// Release with zero cooldown
rl.Release()

// With zero cooldown, AfterFunc fires immediately (or very soon)
time.Sleep(1 * time.Second)

// Should be available again almost immediately
if !rl.TryAcquire() {
t.Error("token should be available after zero cooldown")
}
}

func TestRateLimiter_MultipleAcquireWhenUnavailable(t *testing.T) {
rl := NewRateLimiter(5 * time.Second)

// Acquire the token
if !rl.TryAcquire() {
t.Fatal("initial TryAcquire should succeed")
}

// Multiple attempts should all fail
for i := range 10 {
if rl.TryAcquire() {
t.Errorf("TryAcquire attempt %d should have failed", i)
}
}
}

func TestRateLimiter_RaceCondition(t *testing.T) {
rl := NewRateLimiter(1 * time.Second)

const numGoroutines = 100
const iterations = 10

var successCount atomic.Int64
var wg sync.WaitGroup

wg.Add(numGoroutines)
for range numGoroutines {
go func() {
defer wg.Done()
for range iterations {
if rl.TryAcquire() {
successCount.Add(1)
rl.Release()
}
}
}()
}

wg.Wait()

if successCount.Load() == 0 {
t.Error("expected at least one successful acquires")
}

time.Sleep(2 * time.Second)

// After all goroutines finish and cooldowns complete, token should be available
rl.mu.Lock()
if !rl.available {
t.Error("rate limiter should be in available state after all operations complete")
}
rl.mu.Unlock()
}
2 changes: 1 addition & 1 deletion pool/static_pool/debug.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package static_pool //nolint:stylecheck
package static_pool

import (
"context"
Expand Down
Loading
Loading