Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/sentry/telemetry/buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule Sentry.Telemetry.Buffer do
use GenServer

alias __MODULE__

alias Sentry.ClientReport
alias Sentry.Telemetry.Category

@enforce_keys [:category, :capacity, :batch_size]
Expand Down Expand Up @@ -177,6 +179,12 @@ defmodule Sentry.Telemetry.Buffer do
defp offer(%Buffer{size: size, capacity: capacity} = state, item)
when size >= capacity do
{{:value, _dropped}, items} = :queue.out(state.items)

ClientReport.Sender.record_discarded_events(
:cache_overflow,
Category.data_category(state.category)
)

%{state | items: :queue.in(item, items)}
end

Expand Down
23 changes: 23 additions & 0 deletions lib/sentry/telemetry/category.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,27 @@ defmodule Sentry.Telemetry.Category do
"""
@spec priorities() :: [priority()]
def priorities, do: @priorities

@doc """
Returns the Sentry data category string for a given telemetry category.

These strings are used in client reports and rate limiting.

## Examples

iex> Sentry.Telemetry.Category.data_category(:error)
"error"

iex> Sentry.Telemetry.Category.data_category(:check_in)
"monitor"

iex> Sentry.Telemetry.Category.data_category(:log)
"log_item"

"""
@spec data_category(t()) :: String.t()
def data_category(:error), do: "error"
def data_category(:check_in), do: "monitor"
def data_category(:transaction), do: "transaction"
def data_category(:log), do: "log_item"
end
30 changes: 27 additions & 3 deletions lib/sentry/telemetry/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Sentry.Telemetry.Scheduler do

alias Sentry.Telemetry.{Buffer, Category}
alias Sentry.{CheckIn, ClientReport, Config, Envelope, Event, LogEvent, Transaction, Transport}
alias Sentry.Transport.RateLimiter

@default_capacity 1000

Expand Down Expand Up @@ -226,9 +227,15 @@ defmodule Sentry.Telemetry.Scheduler do

case Buffer.poll_if_ready(buffer) do
{:ok, items} when items != [] ->
state = send_items(state, category, items)
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
if category_rate_limited?(state, category) do
record_rate_limited_discards(category, items)
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
else
state = send_items(state, category, items)
state = advance_cycle(state)
process_cycle(state, attempts + 1, max_attempts)
end

_ ->
state = advance_cycle(state)
Expand Down Expand Up @@ -496,6 +503,23 @@ defmodule Sentry.Telemetry.Scheduler do
end
end

# Skip rate limit checks when on_envelope callback is set (unit test mode)
defp category_rate_limited?(%{on_envelope: cb}, _category) when is_function(cb, 1), do: false

defp category_rate_limited?(_state, category) do
data_category = Category.data_category(category)
RateLimiter.rate_limited?(data_category)
end

defp record_rate_limited_discards(category, items) do
data_category = Category.data_category(category)
quantity = length(items)

for _ <- 1..quantity do
ClientReport.Sender.record_discarded_events(:ratelimit_backoff, data_category)
end
end

defp default_weights do
%{
critical: Category.weight(:critical),
Expand Down
170 changes: 170 additions & 0 deletions test/sentry/telemetry_processor_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,167 @@ defmodule Sentry.TelemetryProcessorIntegrationTest do
end
end

describe "buffer overflow client reports" do
setup ctx do
stop_supervised!(ctx.processor)

uid = System.unique_integer([:positive])
processor_name = :"test_overflow_#{uid}"

start_supervised!(
{TelemetryProcessor,
name: processor_name, buffer_configs: %{log: %{capacity: 2, batch_size: 1}}},
id: processor_name
)

Process.put(:sentry_telemetry_processor, processor_name)

send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

Process.sleep(50)

flush_ref_messages(ctx.ref)

%{processor: processor_name}
end

test "sends cache_overflow client report when log buffer overflows", ctx do
scheduler = TelemetryProcessor.get_scheduler(ctx.processor)
:sys.suspend(scheduler)

TelemetryProcessor.add(ctx.processor, make_log_event("log-1"))
TelemetryProcessor.add(ctx.processor, make_log_event("log-2"))
TelemetryProcessor.add(ctx.processor, make_log_event("log-3"))

log_buffer = TelemetryProcessor.get_buffer(ctx.processor, :log)
_ = Buffer.size(log_buffer)
_ = :sys.get_state(Sentry.ClientReport.Sender)

send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

ref = ctx.ref
assert_receive {^ref, body}, 2000

items = decode_envelope!(body)
assert [{%{"type" => "client_report"}, client_report}] = items

cache_overflow =
Enum.find(client_report["discarded_events"], &(&1["reason"] == "cache_overflow"))

assert cache_overflow["category"] == "log_item"
assert cache_overflow["quantity"] == 1

:sys.resume(scheduler)
end
end

describe "scheduler rate limit checks" do
setup ctx do
# Flush any pre-existing client reports so they don't interfere
send(Process.whereis(Sentry.ClientReport.Sender), :send_report)
Process.sleep(50)
flush_ref_messages(ctx.ref)

# Clean up rate limit entries after test
on_exit(fn ->
try do
:ets.delete(Sentry.Transport.RateLimiter, "log_item")
:ets.delete(Sentry.Transport.RateLimiter, "error")
catch
_, _ -> :ok
end
end)

:ok
end

test "drops rate-limited log events before reaching transport", ctx do
scheduler = TelemetryProcessor.get_scheduler(ctx.processor)
:sys.suspend(scheduler)

TelemetryProcessor.add(ctx.processor, make_log_event("rate-limited-log"))

log_buffer = TelemetryProcessor.get_buffer(ctx.processor, :log)
assert Buffer.size(log_buffer) == 1

# Set rate limit on log_item category in the global RateLimiter ETS table
:ets.insert(Sentry.Transport.RateLimiter, {"log_item", System.system_time(:second) + 60})

:sys.resume(scheduler)

# Synchronize - ensure scheduler has processed the pending signal.
# :sys.get_state is processed between regular messages, so this returns
# the state right after the :signal cast finishes but before :DOWN (if any).
scheduler_state = :sys.get_state(scheduler)

# Buffer should be drained (items were polled by the scheduler)
assert Buffer.size(log_buffer) == 0

# Items should have been dropped at the scheduler level, NOT forwarded
# to the transport queue. Without the scheduler-level rate limit check,
# items would be enqueued (size > 0) and a send process spawned (active_ref != nil).
assert scheduler_state.size == 0
assert scheduler_state.active_ref == nil

# Client report should be recorded for rate-limited items
send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

ref = ctx.ref
assert_receive {^ref, body}, 2000

items = decode_envelope!(body)
assert [{%{"type" => "client_report"}, client_report}] = items

ratelimit_event =
Enum.find(client_report["discarded_events"], &(&1["reason"] == "ratelimit_backoff"))

assert ratelimit_event != nil
assert ratelimit_event["category"] == "log_item"
assert ratelimit_event["quantity"] == 1
end

test "drops rate-limited error events before reaching transport", ctx do
put_test_config(telemetry_processor_categories: [:error, :log])

scheduler = TelemetryProcessor.get_scheduler(ctx.processor)
:sys.suspend(scheduler)

Sentry.capture_message("rate-limited-error", result: :none)

error_buffer = TelemetryProcessor.get_buffer(ctx.processor, :error)
assert Buffer.size(error_buffer) == 1

# Set rate limit on error category
:ets.insert(Sentry.Transport.RateLimiter, {"error", System.system_time(:second) + 60})

:sys.resume(scheduler)
scheduler_state = :sys.get_state(scheduler)

# Buffer should be drained
assert Buffer.size(error_buffer) == 0

# Items should have been dropped at the scheduler level
assert scheduler_state.size == 0
assert scheduler_state.active_ref == nil

# Client report should be recorded for rate-limited items
send(Process.whereis(Sentry.ClientReport.Sender), :send_report)

ref = ctx.ref
assert_receive {^ref, body}, 2000

items = decode_envelope!(body)
assert [{%{"type" => "client_report"}, client_report}] = items

ratelimit_event =
Enum.find(client_report["discarded_events"], &(&1["reason"] == "ratelimit_backoff"))

assert ratelimit_event != nil
assert ratelimit_event["category"] == "error"
assert ratelimit_event["quantity"] == 1
end
end

defp make_transaction do
now = System.system_time(:microsecond)

Expand All @@ -281,6 +442,15 @@ defmodule Sentry.TelemetryProcessorIntegrationTest do
}
end


defp flush_ref_messages(ref) do
receive do
{^ref, _body} -> flush_ref_messages(ref)
after
100 -> :ok
end
end

defp make_log_event(body) do
%LogEvent{
timestamp: System.system_time(:nanosecond) / 1_000_000_000,
Expand Down
Loading