From 3e8cafe1b1b96b9b7499e072a4e50d0e3f8d5419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Nordstrand?= Date: Mon, 22 Jun 2026 12:21:08 +0200 Subject: [PATCH] Let SidekiqReporter publish zero for required-but-empty queues `SidekiqReporter` only published metrics for queues that currently exist in Redis. A queue with a per-queue CloudWatch autoscaling policy that never enqueues on a given shard would sit permanently in INSUFFICIENT_DATA, blocking scale-in for the whole ECS service. Add an optional `required_queues:` kwarg to `.enable`. The collector iterates `(required_queues + live.keys).uniq` and emits `0` (the correct latency for an empty queue) for any required queue missing from Redis. Behavior with `required_queues` omitted is unchanged, so existing consumers are unaffected. Bump to 0.3.0 (new public kwarg). --- lib/request_queue_time/middleware/version.rb | 2 +- .../auto_scaling_metrics/sidekiq_reporter.rb | 20 +++++--- spec/services/sidekiq_reporter_spec.rb | 51 ++++++++++++++++++- 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/lib/request_queue_time/middleware/version.rb b/lib/request_queue_time/middleware/version.rb index 96547ff..644ad93 100644 --- a/lib/request_queue_time/middleware/version.rb +++ b/lib/request_queue_time/middleware/version.rb @@ -1,5 +1,5 @@ module RequestQueueTime class Middleware - VERSION = "0.2.0" + VERSION = "0.3.0" end end diff --git a/lib/services/auto_scaling_metrics/sidekiq_reporter.rb b/lib/services/auto_scaling_metrics/sidekiq_reporter.rb index 8e0ad71..e6af036 100644 --- a/lib/services/auto_scaling_metrics/sidekiq_reporter.rb +++ b/lib/services/auto_scaling_metrics/sidekiq_reporter.rb @@ -1,23 +1,31 @@ module RequestQueueTime module AutoScalingMetrics class SidekiqReporter - def self.enable + # `required_queues` lists queue names that must always publish a datapoint, + # even when Redis has no entry for them. Without this, a queue with a + # CloudWatch autoscaling policy that never enqueues sits permanently in + # INSUFFICIENT_DATA, blocking scale-in for the whole service. + def self.enable(required_queues: []) Sidekiq.configure_server do |config| config.on(:leader) do AutoScalingMetrics::Reporter.start do |reporter| - reporter.collector = method(:collect_metrics) + reporter.collector = -> { collect_metrics(required_queues) } end end end end - def self.collect_metrics - Sidekiq::Queue.all.each do |queue| + def self.collect_metrics(required_queues = []) + live_queues = Sidekiq::Queue.all.to_h { |q| [q.name, q] } + names = (required_queues + live_queues.keys).uniq + + names.each do |name| + queue = live_queues[name] AutoScalingMetrics::Reporter.add_metric( metric_name: "sidekiq_queue_latency", - value: queue.paused? ? 0 : queue.latency, + value: (queue.nil? || queue.paused?) ? 0 : queue.latency, unit: "Seconds", - dimensions: [{name: "queue_name", value: queue.name}] + dimensions: [{name: "queue_name", value: name}] ) end end diff --git a/spec/services/sidekiq_reporter_spec.rb b/spec/services/sidekiq_reporter_spec.rb index fc3e98a..ba84a66 100644 --- a/spec/services/sidekiq_reporter_spec.rb +++ b/spec/services/sidekiq_reporter_spec.rb @@ -8,10 +8,25 @@ expect(Sidekiq).to receive(:configure_server).and_yield(config = double) expect(config).to receive(:on).with(:leader).and_yield expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double) - expect(reporter).to receive(:collector=).with(described_class.method(:collect_metrics)) + expect(reporter).to receive(:collector=).with(an_instance_of(Proc)) described_class.enable end + + it "passes required_queues through to the collector" do + expect(Sidekiq).to receive(:configure_server).and_yield(config = double) + expect(config).to receive(:on).with(:leader).and_yield + collector = nil + allow(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double) + allow(reporter).to receive(:collector=) { |c| collector = c } + + described_class.enable(required_queues: %w[critical within_3_hours]) + + allow(Sidekiq::Queue).to receive(:all).and_return([]) + expect(described_class).to receive(:collect_metrics).with(%w[critical within_3_hours]).and_call_original + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).twice + collector.call + end end describe ".collect_metrics" do @@ -42,5 +57,39 @@ described_class.collect_metrics end + + it "emits 0 for required_queues that are absent from Redis" do + live = double(name: "default", latency: 5, paused?: false) + allow(Sidekiq::Queue).to receive(:all).and_return([live]) + + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with( + metric_name: "sidekiq_queue_latency", + value: 5, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "default"}] + ) + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with( + metric_name: "sidekiq_queue_latency", + value: 0, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "within_3_hours"}] + ) + + described_class.collect_metrics(%w[default within_3_hours]) + end + + it "deduplicates when a required queue is also live" do + live = double(name: "critical", latency: 2, paused?: false) + allow(Sidekiq::Queue).to receive(:all).and_return([live]) + + expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).once.with( + metric_name: "sidekiq_queue_latency", + value: 2, + unit: "Seconds", + dimensions: [{name: "queue_name", value: "critical"}] + ) + + described_class.collect_metrics(%w[critical]) + end end end