Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions ruby/lib/ci/queue/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,35 @@ def queue_exhausted?
@queue.exhausted?
end

def record_error(id, payload, stats: nil)
def record_error(id, payload, stat_delta: nil)
error_reports[id] = payload
record_stats(stats)
true
end

def record_success(id, stats: nil, skip_flaky_record: false, acknowledge: true)
def record_success(id, skip_flaky_record: false, acknowledge: true)
error_reports.delete(id)
record_stats(stats)
true
end

def record_requeue(id)
true
end

def record_stats(builds_stats)
return unless builds_stats
stats.merge!(builds_stats)
end

def record_stats_delta(delta, pipeline: nil)
return if delta.nil? || delta.empty?
delta.each do |stat_name, value|
next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/)
stats[stat_name] = (stats[stat_name] || 0).to_f + value.to_f
end
end

def fetch_stats(stat_names)
stat_names.zip(stats.values_at(*stat_names).map(&:to_f))
stat_names.zip(stats.values_at(*stat_names).map(&:to_f)).to_h
end

def reset_stats(stat_names)
Expand All @@ -47,11 +64,6 @@ def worker_errors
private

attr_reader :stats

def record_stats(builds_stats)
return unless builds_stats
stats.merge!(builds_stats)
end
end
end
end
99 changes: 77 additions & 22 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,71 @@ def record_warning(type, attributes)
redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
end

def record_error(id, payload, stats: nil)
acknowledged, _ = redis.pipelined do |pipeline|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code uses one pipeline: acknowledge + record_stats together. The new code goes more round trip which may cause performance regression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you want to keep it pipelined you need to inline it into the lua script which is a lot more complicated as you cannot rely on the result of the previous command in a pipeline. I don't think it's a significant performance regression so I prefer simplicity here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. We can start simple first

@queue.acknowledge(id, error: payload, pipeline: pipeline)
record_stats(stats, pipeline: pipeline)
def record_error(id, payload, stat_delta: nil)
# Run acknowledge first so we know whether we're the first to ack
acknowledged = @queue.acknowledge(id, error: payload)

if acknowledged
# We were the first to ack; another worker already ack'd would get falsy from SADD
@queue.increment_test_failed
# Only the acknowledging worker's stats include this failure (others skip increment when ack=false).
# Store so we can subtract it if another worker records success later.
store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any?
end

@queue.increment_test_failed if acknowledged == 1
nil
# Return so caller can roll back local counter when not acknowledged
!!acknowledged
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the other implementations of record_error too?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so.

  1. lib/ci/queue/build_record.rb (base BuildRecord): Static/local queue, single process, no Redis, no distributed workers.
  2. lib/ci/queue/redis/grind_record.rb (GrindRecord): Grind mode (run tests many times). Different model from the main queue. It just lpushes onto an error list and updates stats. No “processed” set or SADD-based first-ack semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is: Are they used with the reporter because they all return nil now which means we would always decrement the error count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! is this needed?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! does not change whether the value is “success” or “failure”. It only turns that into a real boolean.

acknowledge.lua already returns a boolean, so the !! is not a must. However, this can make sure the future change to acknowledge.lua will be safe.

end

def record_success(id, stats: nil, skip_flaky_record: false)
_, error_reports_deleted_count, requeued_count, _ = redis.multi do |transaction|
def record_success(id, skip_flaky_record: false)
acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction|
@queue.acknowledge(id, pipeline: transaction)
transaction.hdel(key('error-reports'), id)
transaction.hget(key('requeues-count'), id)
record_stats(stats, pipeline: transaction)
transaction.hget(key('error-report-deltas'), id)
end
# When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution
if error_reports_deleted_count.to_i > 0 && delta_json
apply_error_report_delta_correction(delta_json)
redis.hdel(key('error-report-deltas'), id)
end
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
nil
# Count this run when we ack'd or when we replaced a failure (so stats delta is applied)
!!(acknowledged || error_reports_deleted_count.to_i > 0)
end

def record_requeue(id, stats: nil)
redis.pipelined do |pipeline|
record_stats(stats, pipeline: pipeline)
def record_requeue(id)
true
end

def record_stats(stats = nil, pipeline: nil)
return unless stats
if pipeline
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
else
redis.pipelined do |p|
record_stats(stats, pipeline: p)
end
end
end

# Apply a delta to this worker's stats in Redis (HINCRBY). Use this instead of
# record_stats when recording per-test so we never overwrite and correction sticks.
def record_stats_delta(delta, pipeline: nil)
return if delta.nil? || delta.empty?
apply_delta = lambda do |p|
delta.each do |stat_name, value|
next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/)
p.hincrbyfloat(key(stat_name), config.worker_id.to_s, value.to_f)
p.expire(key(stat_name), config.redis_ttl)
end
end
if pipeline
apply_delta.call(pipeline)
else
redis.pipelined { |p| apply_delta.call(p) }
end
end

Expand Down Expand Up @@ -130,17 +171,31 @@ def reset_stats(stat_names)

attr_reader :config, :redis

def record_stats(stats, pipeline: redis)
return unless stats
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
end

def key(*args)
KeyShortener.key(config.build_id, *args)
end

def store_error_report_delta(test_id, stat_delta)
# Only the acknowledging worker's stats include this test; store their delta for correction on success
payload = { 'worker_id' => config.worker_id.to_s }.merge(stat_delta)
redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload))
redis.expire(key('error-report-deltas'), config.redis_ttl)
end

def apply_error_report_delta_correction(delta_json)
delta = JSON.parse(delta_json)
worker_id = delta.delete('worker_id')&.to_s
return if worker_id.nil? || worker_id.empty? || delta.empty?

redis.pipelined do |pipeline|
delta.each do |stat_name, value|
next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/)

pipeline.hincrbyfloat(key(stat_name), worker_id, -value.to_f)
pipeline.expire(key(stat_name), config.redis_ttl)
end
end
end
end
end
end
Expand Down
28 changes: 16 additions & 12 deletions ruby/lib/ci/queue/redis/grind_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,32 @@ def initialize(queue, redis, config)
@config = config
end

def record_error(payload, stats: nil)
def record_error(payload)
redis.pipelined do |pipeline|
pipeline.lpush(
key('error-reports'),
payload,
)
pipeline.expire(key('error-reports'), config.redis_ttl)
record_stats(stats, pipeline: pipeline)
end
nil
end

def record_success(stats: nil)
record_stats(stats)
def record_success
end

def record_stats(stats, pipeline: nil)
return unless stats
if pipeline
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
else
redis.pipelined do |p|
record_stats(stats, pipeline: p)
end
end
end

def record_warning(_,_)
Expand Down Expand Up @@ -54,14 +66,6 @@ def pop_warnings
def key(*args)
KeyShortener.key(config.build_id, *args)
end

def record_stats(stats, pipeline: redis)
return unless stats
stats.each do |stat_name, stat_value|
pipeline.hset(key(stat_name), config.worker_id, stat_value)
pipeline.expire(key(stat_name), config.redis_ttl)
end
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/ci/queue/redis/key_shortener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module KeyShortener
'queue' => 'q',
'owners' => 'o',
'error-reports' => 'e',
'error-report-deltas' => 'ed',
'requeues-count' => 'rc',
'assertions' => 'a',
'errors' => 'er',
Expand Down
46 changes: 32 additions & 14 deletions ruby/lib/minitest/queue/build_status_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,46 @@ def record(test)
super

self.total_time = Minitest.clock_time - start_time
if test.requeued?
self.requeues += 1
elsif test.skipped?
self.skips += 1
elsif test.error?
self.errors += 1
elsif test.failure
self.failures += 1
end

# Determine what type of result this is and record it
test_id = "#{test.klass}##{test.name}"
delta = delta_for(test)

stats = COUNTERS.zip(COUNTERS.map { |c| send(c) }).to_h
if (test.failure || test.error?) && !test.skipped?
build.record_error("#{test.klass}##{test.name}", dump(test), stats: stats)
acknowledged = if (test.failure || test.error?) && !test.skipped?
build.record_error(test_id, dump(test), stat_delta: delta)
elsif test.requeued?
build.record_requeue("#{test.klass}##{test.name}", stats: stats)
build.record_requeue(test_id)
else
build.record_success("#{test.klass}##{test.name}", stats: stats, skip_flaky_record: test.skipped?)
build.record_success(test_id, skip_flaky_record: test.skipped?)
end

if acknowledged
if (test.failure || test.error?) && !test.skipped?
test.error? ? self.errors += 1 : self.failures += 1
elsif test.requeued?
self.requeues += 1
elsif test.skipped?
self.skips += 1
end
# Apply delta to Redis (record_success returns true when ack'd or when we replaced a failure)
build.record_stats_delta(delta)
end
end

private

def delta_for(test)
h = { 'assertions' => (test.assertions || 0).to_i, 'errors' => 0, 'failures' => 0, 'skips' => 0, 'requeues' => 0, 'total_time' => test.time.to_f }
if (test.failure || test.error?) && !test.skipped?
test.error? ? h['errors'] = 1 : h['failures'] = 1
elsif test.requeued?
h['requeues'] = 1
elsif test.skipped?
h['skips'] = 1
end
h
end

def dump(test)
ErrorReport.new(self.class.failure_formatter.new(test).to_h).dump
end
Expand Down
6 changes: 3 additions & 3 deletions ruby/lib/minitest/queue/grind_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def record(test)
private

def record_test(test)
stats = self.class.counters
if (test.failure || test.error?) && !test.skipped?
build.record_error(dump(test), stats: stats)
build.record_error(dump(test))
else
build.record_success(stats: stats)
build.record_success
end
build.record_stats(self.class.counters)
end

def increment_counter(test)
Expand Down
25 changes: 20 additions & 5 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true
require 'optparse'
require 'json'
require 'fileutils'
require 'minitest/queue'
require 'ci/queue'
require 'digest/md5'
Expand Down Expand Up @@ -242,16 +243,16 @@ def bisect_command
puts

File.write('log/test_order.log', failing_order.to_a.map(&:id).join("\n"))

bisect_test_details = failing_order.to_a.map do |test|
source_location = test.source_location
file_path = source_location&.first || 'unknown'
line_number = source_location&.last || -1
"#{test.id} #{file_path}:#{line_number}"
end

File.write('log/bisect_test_details.log', bisect_test_details.join("\n"))

exit! 0
end
end
Expand Down Expand Up @@ -336,8 +337,22 @@ def display_warnings(build)
warnings = build.pop_warnings.map do |type, attributes|
attributes.merge(type: type)
end.compact
File.open(queue_config.warnings_file, 'w') do |f|
JSON.dump(warnings, f)

return if warnings.empty?

begin
# Ensure directory exists
dir = File.dirname(queue_config.warnings_file)
FileUtils.mkdir_p(dir) unless File.directory?(dir)

# Write each warning as a separate JSON line (JSONL format)
File.open(queue_config.warnings_file, 'a') do |f|
warnings.each do |warning|
f.puts(JSON.dump(warning))
end
end
rescue => error
STDERR.puts "Failed to write warnings: #{error.message}"
end
end

Expand Down
Loading
Loading