-
Notifications
You must be signed in to change notification settings - Fork 33
Only increment stats when the worker acknowledged the test #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,30 +56,71 @@ def record_warning(type, attributes) | |
| redis.rpush(key('warnings'), Marshal.dump([type, attributes])) | ||
| end | ||
|
|
||
| def record_error(id, payload, stats: nil) | ||
| acknowledged, _ = redis.pipelined do |pipeline| | ||
| @queue.acknowledge(id, error: payload, pipeline: pipeline) | ||
| record_stats(stats, pipeline: pipeline) | ||
| def record_error(id, payload, stat_delta: nil) | ||
| # Run acknowledge first so we know whether we're the first to ack | ||
| acknowledged = @queue.acknowledge(id, error: payload) | ||
|
|
||
| if acknowledged | ||
| # We were the first to ack; another worker already ack'd would get falsy from SADD | ||
| @queue.increment_test_failed | ||
| # Only the acknowledging worker's stats include this failure (others skip increment when ack=false). | ||
| # Store so we can subtract it if another worker records success later. | ||
| store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any? | ||
| end | ||
|
|
||
| @queue.increment_test_failed if acknowledged == 1 | ||
| nil | ||
| # Return so caller can roll back local counter when not acknowledged | ||
| !!acknowledged | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to update the other implementations of record_error too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't think so.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I mean is: Are they used with the reporter because they all return
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. !! does not change whether the value is “success” or “failure”. It only turns that into a real boolean. acknowledge.lua already returns a boolean, so the !! is not a must. However, this can make sure the future change to acknowledge.lua will be safe. |
||
| end | ||
|
|
||
| def record_success(id, stats: nil, skip_flaky_record: false) | ||
| _, error_reports_deleted_count, requeued_count, _ = redis.multi do |transaction| | ||
| def record_success(id, skip_flaky_record: false) | ||
| acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction| | ||
| @queue.acknowledge(id, pipeline: transaction) | ||
| transaction.hdel(key('error-reports'), id) | ||
| transaction.hget(key('requeues-count'), id) | ||
| record_stats(stats, pipeline: transaction) | ||
| transaction.hget(key('error-report-deltas'), id) | ||
| end | ||
| # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution | ||
| if error_reports_deleted_count.to_i > 0 && delta_json | ||
| apply_error_report_delta_correction(delta_json) | ||
| redis.hdel(key('error-report-deltas'), id) | ||
| end | ||
| record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) | ||
| nil | ||
| # Count this run when we ack'd or when we replaced a failure (so stats delta is applied) | ||
| !!(acknowledged || error_reports_deleted_count.to_i > 0) | ||
| end | ||
|
|
||
| def record_requeue(id, stats: nil) | ||
| redis.pipelined do |pipeline| | ||
| record_stats(stats, pipeline: pipeline) | ||
| def record_requeue(id) | ||
| true | ||
| end | ||
|
|
||
| def record_stats(stats = nil, pipeline: nil) | ||
| return unless stats | ||
| if pipeline | ||
| stats.each do |stat_name, stat_value| | ||
| pipeline.hset(key(stat_name), config.worker_id, stat_value) | ||
| pipeline.expire(key(stat_name), config.redis_ttl) | ||
| end | ||
| else | ||
| redis.pipelined do |p| | ||
| record_stats(stats, pipeline: p) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # Apply a delta to this worker's stats in Redis (HINCRBY). Use this instead of | ||
| # record_stats when recording per-test so we never overwrite and correction sticks. | ||
| def record_stats_delta(delta, pipeline: nil) | ||
| return if delta.nil? || delta.empty? | ||
| apply_delta = lambda do |p| | ||
| delta.each do |stat_name, value| | ||
| next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) | ||
| p.hincrbyfloat(key(stat_name), config.worker_id.to_s, value.to_f) | ||
| p.expire(key(stat_name), config.redis_ttl) | ||
| end | ||
| end | ||
| if pipeline | ||
| apply_delta.call(pipeline) | ||
| else | ||
| redis.pipelined { |p| apply_delta.call(p) } | ||
| end | ||
| end | ||
|
|
||
|
|
@@ -130,17 +171,31 @@ def reset_stats(stat_names) | |
|
|
||
| attr_reader :config, :redis | ||
|
|
||
| def record_stats(stats, pipeline: redis) | ||
| return unless stats | ||
| stats.each do |stat_name, stat_value| | ||
| pipeline.hset(key(stat_name), config.worker_id, stat_value) | ||
| pipeline.expire(key(stat_name), config.redis_ttl) | ||
| end | ||
| end | ||
|
|
||
| def key(*args) | ||
| KeyShortener.key(config.build_id, *args) | ||
| end | ||
|
|
||
| def store_error_report_delta(test_id, stat_delta) | ||
| # Only the acknowledging worker's stats include this test; store their delta for correction on success | ||
| payload = { 'worker_id' => config.worker_id.to_s }.merge(stat_delta) | ||
| redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload)) | ||
| redis.expire(key('error-report-deltas'), config.redis_ttl) | ||
| end | ||
|
|
||
| def apply_error_report_delta_correction(delta_json) | ||
| delta = JSON.parse(delta_json) | ||
| worker_id = delta.delete('worker_id')&.to_s | ||
| return if worker_id.nil? || worker_id.empty? || delta.empty? | ||
|
|
||
| redis.pipelined do |pipeline| | ||
| delta.each do |stat_name, value| | ||
| next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) | ||
|
|
||
| pipeline.hincrbyfloat(key(stat_name), worker_id, -value.to_f) | ||
| pipeline.expire(key(stat_name), config.redis_ttl) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current code uses one pipeline: acknowledge + record_stats together. The new code goes more round trip which may cause performance regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if you want to keep it pipelined you need to inline it into the lua script which is a lot more complicated as you cannot rely on the result of the previous command in a pipeline. I don't think it's a significant performance regression so I prefer simplicity here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. We can start simple first