diff --git a/lib/request_queue_time/middleware/version.rb b/lib/request_queue_time/middleware/version.rb index 96547ff..644ad93 100644 --- a/lib/request_queue_time/middleware/version.rb +++ b/lib/request_queue_time/middleware/version.rb @@ -1,5 +1,5 @@ module RequestQueueTime class Middleware - VERSION = "0.2.0" + VERSION = "0.3.0" end end diff --git a/lib/services/auto_scaling_metrics/sidekiq_reporter.rb b/lib/services/auto_scaling_metrics/sidekiq_reporter.rb index 8e0ad71..e6af036 100644 --- a/lib/services/auto_scaling_metrics/sidekiq_reporter.rb +++ b/lib/services/auto_scaling_metrics/sidekiq_reporter.rb @@ -1,23 +1,31 @@ module RequestQueueTime module AutoScalingMetrics class SidekiqReporter - def self.enable + # `required_queues` lists queue names that must always publish a datapoint, + # even when Redis has no entry for them. Without this, a queue with a + # CloudWatch autoscaling policy that never enqueues sits permanently in + # INSUFFICIENT_DATA, blocking scale-in for the whole service. + def self.enable(required_queues: []) Sidekiq.configure_server do |config| config.on(:leader) do AutoScalingMetrics::Reporter.start do |reporter| - reporter.collector = method(:collect_metrics) + reporter.collector = -> { collect_metrics(required_queues) } end end end end - def self.collect_metrics - Sidekiq::Queue.all.each do |queue| + def self.collect_metrics(required_queues = []) + live_queues = Sidekiq::Queue.all.to_h { |q| [q.name, q] } + names = (required_queues + live_queues.keys).uniq + + names.each do |name| + queue = live_queues[name] AutoScalingMetrics::Reporter.add_metric( metric_name: "sidekiq_queue_latency", - value: queue.paused? ? 0 : queue.latency, + value: (queue.nil? || queue.paused?) ? 0 : queue.latency, unit: "Seconds", - dimensions: [{name: "queue_name", value: queue.name}] + dimensions: [{name: "queue_name", value: name}] ) end end diff --git a/spec/services/sidekiq_reporter_spec.rb b/spec/services/sidekiq_reporter_spec.rb index fc3e98a..ba84a66 100644 --- a/spec/services/sidekiq_reporter_spec.rb +++ b/spec/services/sidekiq_reporter_spec.rb @@ -8,10 +8,25 @@ expect(Sidekiq).to receive(:configure_server).and_yield(config = double) expect(config).to receive(:on).with(:leader).and_yield expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double) - expect(reporter).to receive(:collector=).with(described_class.method(:collect_metrics)) + expect(reporter).to receive(:collector=).with(an_instance_of(Proc)) described_class.enable end + + it "passes required_queues through to the collector" do + expect(Sidekiq).to receive(:configure_server).and_yield(config = double) + expect(config).to receive(:on).with(:leader).and_yield + collector = nil + allow(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double) + allow(reporter).to receive(:collector=) { |c| collector = c } + + described_class.enable(required_queues: %w[critical within_3_hours]) + + allow(Sidekiq::Queue).to receive(:all).and_return([]) + expect(described_class).to receive(:collect_metrics).with(%w[critical within_3_hours]).and_call_original + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).twice + collector.call + end end describe ".collect_metrics" do @@ -42,5 +57,39 @@ described_class.collect_metrics end + + it "emits 0 for required_queues that are absent from Redis" do + live = double(name: "default", latency: 5, paused?: false) + allow(Sidekiq::Queue).to receive(:all).and_return([live]) + + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with( + metric_name: "sidekiq_queue_latency", + value: 5, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "default"}] + ) + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with( + metric_name: "sidekiq_queue_latency", + value: 0, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "within_3_hours"}] + ) + + described_class.collect_metrics(%w[default within_3_hours]) + end + + it "deduplicates when a required queue is also live" do + live = double(name: "critical", latency: 2, paused?: false) + allow(Sidekiq::Queue).to receive(:all).and_return([live]) + + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).once.with( + metric_name: "sidekiq_queue_latency", + value: 2, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "critical"}] + ) + + described_class.collect_metrics(%w[critical]) + end end end