Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/request_queue_time/middleware/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module RequestQueueTime
class Middleware
VERSION = "0.2.0"
VERSION = "0.3.0"
end
end
20 changes: 14 additions & 6 deletions lib/services/auto_scaling_metrics/sidekiq_reporter.rb
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
module RequestQueueTime
module AutoScalingMetrics
class SidekiqReporter
def self.enable
# `required_queues` lists queue names that must always publish a datapoint,
# even when Redis has no entry for them. Without this, a queue with a
# CloudWatch autoscaling policy that never enqueues sits permanently in
# INSUFFICIENT_DATA, blocking scale-in for the whole service.
def self.enable(required_queues: [])
Sidekiq.configure_server do |config|
config.on(:leader) do
AutoScalingMetrics::Reporter.start do |reporter|
reporter.collector = method(:collect_metrics)
reporter.collector = -> { collect_metrics(required_queues) }
end
end
end
end

def self.collect_metrics
Sidekiq::Queue.all.each do |queue|
def self.collect_metrics(required_queues = [])
live_queues = Sidekiq::Queue.all.to_h { |q| [q.name, q] }
names = (required_queues + live_queues.keys).uniq

names.each do |name|
queue = live_queues[name]
AutoScalingMetrics::Reporter.add_metric(
metric_name: "sidekiq_queue_latency",
value: queue.paused? ? 0 : queue.latency,
value: (queue.nil? || queue.paused?) ? 0 : queue.latency,
unit: "Seconds",
dimensions: [{name: "queue_name", value: queue.name}]
dimensions: [{name: "queue_name", value: name}]
)
end
end
Expand Down
51 changes: 50 additions & 1 deletion spec/services/sidekiq_reporter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@
expect(Sidekiq).to receive(:configure_server).and_yield(config = double)
expect(config).to receive(:on).with(:leader).and_yield
expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double)
expect(reporter).to receive(:collector=).with(described_class.method(:collect_metrics))
expect(reporter).to receive(:collector=).with(an_instance_of(Proc))

described_class.enable
end

it "passes required_queues through to the collector" do
expect(Sidekiq).to receive(:configure_server).and_yield(config = double)
expect(config).to receive(:on).with(:leader).and_yield
collector = nil
allow(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:start).and_yield(reporter = double)
allow(reporter).to receive(:collector=) { |c| collector = c }

described_class.enable(required_queues: %w[critical within_3_hours])

allow(Sidekiq::Queue).to receive(:all).and_return([])
expect(described_class).to receive(:collect_metrics).with(%w[critical within_3_hours]).and_call_original
expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).twice
collector.call
end
end

describe ".collect_metrics" do
Expand Down Expand Up @@ -42,5 +57,39 @@

described_class.collect_metrics
end

it "emits 0 for required_queues that are absent from Redis" do
live = double(name: "default", latency: 5, paused?: false)
allow(Sidekiq::Queue).to receive(:all).and_return([live])

expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with(
metric_name: "sidekiq_queue_latency",
value: 5,
unit: "Seconds",
dimensions: [{name: "queue_name", value: "default"}]
)
expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).with(
metric_name: "sidekiq_queue_latency",
value: 0,
unit: "Seconds",
dimensions: [{name: "queue_name", value: "within_3_hours"}]
)

described_class.collect_metrics(%w[default within_3_hours])
end

it "deduplicates when a required queue is also live" do
live = double(name: "critical", latency: 2, paused?: false)
allow(Sidekiq::Queue).to receive(:all).and_return([live])

expect(RequestQueueTime::AutoScalingMetrics::Reporter).to receive(:add_metric).once.with(
metric_name: "sidekiq_queue_latency",
value: 2,
unit: "Seconds",
dimensions: [{name: "queue_name", value: "critical"}]
)

described_class.collect_metrics(%w[critical])
end
end
end