Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions redis/_entry_helpers.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local function test_id_from_entry(value, delimiter)
if delimiter then
local pos = string.find(value, delimiter, 1, true)
if pos then
return string.sub(value, 1, pos - 1)
end
end
return value
end
15 changes: 8 additions & 7 deletions redis/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ local processed_key = KEYS[2]
local owners_key = KEYS[3]
local error_reports_key = KEYS[4]

local test = ARGV[1]
local error = ARGV[2]
local ttl = ARGV[3]
redis.call('zrem', zset_key, test)
redis.call('hdel', owners_key, test) -- Doesn't matter if it was reclaimed by another workers
local acknowledged = redis.call('sadd', processed_key, test) == 1
local entry = ARGV[1]
local test_id = ARGV[2]
local error = ARGV[3]
local ttl = ARGV[4]
redis.call('zrem', zset_key, entry)
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
local acknowledged = redis.call('sadd', processed_key, test_id) == 1

if acknowledged and error ~= "" then
redis.call('hset', error_reports_key, test, error)
redis.call('hset', error_reports_key, test_id, error)
redis.call('expire', error_reports_key, ttl)
end

Expand Down
13 changes: 9 additions & 4 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local worker_queue_key = KEYS[4]

local current_time = ARGV[1]
local test = ARGV[2]
local entry = ARGV[2]
local entry_delimiter = ARGV[3]

local test_id = test_id_from_entry(entry, entry_delimiter)

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test) == 1 then
if redis.call('sismember', processed_key, test_id) == 1 then
return false
end

-- we're still the owner of the test, we can bump the timestamp
if redis.call('hget', owners_key, test) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, test)
if redis.call('hget', owners_key, entry) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, entry)
end
23 changes: 12 additions & 11 deletions redis/requeue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ local error_reports_key = KEYS[7]

local max_requeues = tonumber(ARGV[1])
local global_max_requeues = tonumber(ARGV[2])
local test = ARGV[3]
local offset = ARGV[4]
local entry = ARGV[3]
local test_id = ARGV[4]
local offset = ARGV[5]

if redis.call('hget', owners_key, test) == worker_queue_key then
redis.call('hdel', owners_key, test)
if redis.call('hget', owners_key, entry) == worker_queue_key then
redis.call('hdel', owners_key, entry)
end

if redis.call('sismember', processed_key, test) == 1 then
if redis.call('sismember', processed_key, test_id) == 1 then
return false
end

Expand All @@ -24,23 +25,23 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then
return false
end

local requeues = tonumber(redis.call('hget', requeues_count_key, test))
local requeues = tonumber(redis.call('hget', requeues_count_key, test_id))
if requeues and requeues >= max_requeues then
return false
end

redis.call('hincrby', requeues_count_key, '___total___', 1)
redis.call('hincrby', requeues_count_key, test, 1)
redis.call('hincrby', requeues_count_key, test_id, 1)

redis.call('hdel', error_reports_key, test)
redis.call('hdel', error_reports_key, test_id)

local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1]
if pivot then
redis.call('linsert', queue_key, 'BEFORE', pivot, test)
redis.call('linsert', queue_key, 'BEFORE', pivot, entry)
else
redis.call('lpush', queue_key, test)
redis.call('lpush', queue_key, entry)
end

redis.call('zrem', zset_key, test)
redis.call('zrem', zset_key, entry)

return true
6 changes: 5 additions & 1 deletion redis/reserve_lost.lua
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local worker_queue_key = KEYS[3]
local owners_key = KEYS[4]

local current_time = ARGV[1]
local timeout = ARGV[2]
local entry_delimiter = ARGV[3]

local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
if redis.call('sismember', processed_key, test) == 0 then
local test_id = test_id_from_entry(test, entry_delimiter)
if redis.call('sismember', processed_key, test_id) == 0 then
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
Expand Down
27 changes: 27 additions & 0 deletions ruby/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@ minitest-queue --queue redis://example.com run -Itest test/**/*_test.rb

Additionally you can configure the requeue settings (see main README) with `--max-requeues` and `--requeue-tolerance`.

#### Lazy loading (opt-in)

Lazy loading and streaming are currently supported only by `minitest-queue` (not `rspec-queue`).

To reduce worker memory usage, you can enable lazy loading so test files are loaded on-demand:

```bash
minitest-queue --queue redis://example.com --lazy-load run -Itest test/**/*_test.rb
```

You can tune streaming with `--lazy-load-stream-batch-size` (default: 5000) and `--lazy-load-stream-timeout` (default 300s).

Environment variables:

- `CI_QUEUE_LAZY_LOAD=1`
- `CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE=10000`
- `CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT=300`
- `CI_QUEUE_LAZY_LOAD_TEST_HELPERS=test/test_helper.rb`

Backward-compatible aliases still work:

- `CI_QUEUE_STREAM_BATCH_SIZE`
- `CI_QUEUE_STREAM_TIMEOUT`
- `CI_QUEUE_TEST_HELPERS`

When enabled, file loading stats are printed at the end of the run if debug is enabled.


If you'd like to centralize the error reporting you can do so with:

Expand Down
21 changes: 21 additions & 0 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
require 'ci/queue/file'
require 'ci/queue/grind'
require 'ci/queue/bisect'
require 'ci/queue/queue_entry'
require 'ci/queue/class_resolver'
require 'ci/queue/file_loader'

module CI
module Queue
Expand All @@ -22,6 +25,18 @@ module Queue
attr_accessor :shuffler, :requeueable

Error = Class.new(StandardError)
ClassNotFoundError = Class.new(Error)

class FileLoadError < Error
attr_reader :file_path, :original_error

def initialize(file_path, original_error)
@file_path = file_path
@original_error = original_error
super("Failed to load #{file_path}: #{original_error.class}: #{original_error.message}")
set_backtrace(original_error.backtrace)
end
end

module Warnings
RESERVED_LOST_TEST = :RESERVED_LOST_TEST
Expand All @@ -48,6 +63,11 @@ def shuffle(tests, random)
end
end

def debug?
value = ENV['CI_QUEUE_DEBUG']
value && !value.strip.empty? && !%w[0 false].include?(value.strip.downcase)
end

def from_uri(url, config)
uri = URI(url)
implementation = case uri.scheme
Expand All @@ -65,3 +85,4 @@ def from_uri(url, config)
end
end
end

38 changes: 38 additions & 0 deletions ruby/lib/ci/queue/class_resolver.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module CI
module Queue
module ClassResolver
def self.resolve(class_name, file_path: nil, loader: nil)
klass = try_direct_lookup(class_name)
return klass if klass

if file_path && loader
loader.load_file(file_path)
klass = try_direct_lookup(class_name)
return klass if klass
end

raise ClassNotFoundError, "Unable to resolve class #{class_name}"
end

def self.try_direct_lookup(class_name)
parts = class_name.sub(/\A::/, '').split('::')
current = Object

parts.each do |name|
return nil unless current.const_defined?(name, false)

current = current.const_get(name, false)
end

return nil unless current.is_a?(Class)

current
rescue NameError
nil
end
private_class_method :try_direct_lookup
end
end
end
58 changes: 57 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ class Configuration
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
attr_accessor :lazy_load, :lazy_load_stream_batch_size
attr_accessor :lazy_load_streaming_timeout, :lazy_load_test_helpers
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout

class << self
def from_env(env)
lazy_load_value = env['CI_QUEUE_LAZY_LOAD']
lazy_load = lazy_load_value && !lazy_load_value.strip.empty? && !%w(0 false).include?(lazy_load_value.strip.downcase)
new(
build_id: env['CIRCLE_BUILD_URL'] || env['BUILDKITE_BUILD_ID'] || env['TRAVIS_BUILD_ID'] || env['HEROKU_TEST_RUN_ID'] || env['SEMAPHORE_PIPELINE_ID'],
worker_id: env['CIRCLE_NODE_INDEX'] || env['BUILDKITE_PARALLEL_JOB'] || env['CI_NODE_INDEX'] || env['SEMAPHORE_JOB_ID'],
Expand All @@ -22,6 +26,10 @@ def from_env(env)
debug_log: env['CI_QUEUE_DEBUG_LOG'],
max_requeues: env['CI_QUEUE_MAX_REQUEUES']&.to_i || 0,
requeue_tolerance: env['CI_QUEUE_REQUEUE_TOLERANCE']&.to_f || 0,
lazy_load: lazy_load || false,
lazy_load_stream_batch_size: (env['CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE'] || env['CI_QUEUE_STREAM_BATCH_SIZE'])&.to_i,
lazy_load_streaming_timeout: (env['CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT'] || env['CI_QUEUE_STREAM_TIMEOUT'])&.to_i,
lazy_load_test_helpers: env['CI_QUEUE_LAZY_LOAD_TEST_HELPERS'] || env['CI_QUEUE_TEST_HELPERS'],
)
end

Expand All @@ -46,7 +54,8 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
@failure_file = failure_file
Expand All @@ -73,6 +82,16 @@ def initialize(
@warnings_file = warnings_file
@debug_log = debug_log
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
@lazy_load = lazy_load
@lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000
@lazy_load_streaming_timeout = lazy_load_streaming_timeout
@lazy_load_test_helpers = lazy_load_test_helpers
end

def lazy_load_test_helper_paths
return [] unless @lazy_load_test_helpers

@lazy_load_test_helpers.split(',').map(&:strip)
end

def queue_init_timeout
Expand All @@ -83,6 +102,43 @@ def report_timeout
@report_timeout || timeout
end

def lazy_load_streaming_timeout
if @lazy_load_streaming_timeout && @lazy_load_streaming_timeout > 0
@lazy_load_streaming_timeout
else
[queue_init_timeout, 300].max
end
end

# Backward-compatible aliases for existing callers.
def stream_batch_size
lazy_load_stream_batch_size
end

def stream_batch_size=(value)
self.lazy_load_stream_batch_size = value
end

def streaming_timeout
lazy_load_streaming_timeout
end

def streaming_timeout=(value)
self.lazy_load_streaming_timeout = value
end

def test_helpers
lazy_load_test_helpers
end

def test_helpers=(value)
self.lazy_load_test_helpers = value
end

def test_helper_paths
lazy_load_test_helper_paths
end

def inactive_workers_timeout
@inactive_workers_timeout || timeout
end
Expand Down
Loading
Loading