diff --git a/.changeset/eleven-nails-argue.md b/.changeset/eleven-nails-argue.md new file mode 100644 index 0000000000..147520356d --- /dev/null +++ b/.changeset/eleven-nails-argue.md @@ -0,0 +1,5 @@ +--- +'@core/electric-telemetry': patch +--- + +Add binary memory, average number of off-heap binaries and their ref counts to top processes by memory metric. diff --git a/.changeset/forty-pillows-laugh.md b/.changeset/forty-pillows-laugh.md new file mode 100644 index 0000000000..1ebb0e06a6 --- /dev/null +++ b/.changeset/forty-pillows-laugh.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Include stack_id in otel opts for stack metrics. It had been omitted by mistake before. diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 6b0a63bc26..692af0e35d 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -28,6 +28,7 @@ jobs: - uses: actions/checkout@v4 - uses: erlef/setup-beam@v1 + id: setup_beam with: version-type: strict version-file: '.tool-versions' @@ -62,6 +63,15 @@ jobs: run: mix compile working-directory: packages/sync-service + - name: Cache lux + uses: actions/cache@v4 + with: + path: integration-tests/lux + key: '${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }}' + restore-keys: | + ${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }} + ${{ runner.os }}-lux + - name: Setup lux run: make diff --git a/integration-tests/tests/_macros.luxinc b/integration-tests/tests/_macros.luxinc index ae34abbcca..54a0bbc0f8 100644 --- a/integration-tests/tests/_macros.luxinc +++ b/integration-tests/tests/_macros.luxinc @@ -231,11 +231,16 @@ -p 4318:4318 \ -v $(realpath ../support_files/otel-collector-config.yaml):/conf/otel-collector-config.yaml \ otel/opentelemetry-collector-contrib --config=/conf/otel-collector-config.yaml + # Allow time for docker to pull the image if not cached [timeout 120] + ??Starting HTTP server ?"endpoint": "(0\.0\.0\.0|\[::\]):4318" ??Everything is ready. Begin running and processing data. + + # Reset the timeout for subsequent pattern matching + [timeout 10] [endmacro] [macro teardown_container container_name] diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux new file mode 100644 index 0000000000..c3a2cf5928 --- /dev/null +++ b/integration-tests/tests/otel-export.lux @@ -0,0 +1,232 @@ +[doc Verify that application and stack metrics are correctly exported via Otel] + +[include _macros.luxinc] + +[global pg_container_name=otel-export__pg] + +### + +[invoke setup_otel_collector] + +[invoke setup_pg] + +[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_STACK_TELEMETRY_INIT_DELAY=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] + +# Spawn a process containing off-heap binary references and ensure its in the top 5 by memory footprint. +# Otel Collector sorts metrics in its debug output, so the process label for our process needs +# to be first lexicographically for easier output matching further down. +[shell electric] + """! + _pid = spawn_link(fn -> + Process.set_label(:A_memory_hog) + + on_heap_strings = + Enum.map(1..4000, fn _ -> String.duplicate("on heap", 4000) end) + + off_heap_strings = + Enum.map(1..10, fn i -> String.duplicate("1234567890", 7000) end) + + receive do + pid -> send(pid, {on_heap_strings, off_heap_strings}) + end + end) + """ + + ??#PID + +[shell otel_collector] + # Verify that the Collector receives expected application metrics from Electric + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + ScopeMetrics #0 + """ + + # Verify the presence of process.bin_memory.* metrics + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.avg_bin_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.avg_ref_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: 1\.000000 + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.max_bin_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.max_ref_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.total + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + # Verify the presence of process.memory.total metric + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.total + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + # Verify that the Collector receives stack metrics from Electric with expected resource attributes + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + -> stack_id: Str\(single_stack\) + ScopeMetrics #0 + """ + + # Verify that LSN metrics are exported + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.pg_wal_offset + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_retained_wal_size + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + +[invoke start_psql] + +[shell psql] + !create table items (val text); + ??CREATE + +[shell client] + [invoke curl_shape "http://localhost:3000/v1/shape?table=items&offset=-1"] + + ??HTTP/1.1 200 OK + ?electric-handle: ([\d-]+) + [global handle=$1] + ?electric-offset: ([\w\d_]+) + [global offset=$1] + +[shell psql] + !insert into items values ('3'); + ??INSERT + +[shell client] + [invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=$offset&live"] + + ??HTTP/1.1 200 OK + ??"value":{"val":"3"} + +[shell otel_collector] + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.storage\.transaction_stored\.bytes + """ + +### + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/stack-telemetry.lux b/integration-tests/tests/stack-telemetry.lux deleted file mode 100644 index 18903a96e9..0000000000 --- a/integration-tests/tests/stack-telemetry.lux +++ /dev/null @@ -1,98 +0,0 @@ -[doc Verify that all of the expected stack metrics are exported via Otel] - -[include _macros.luxinc] - -[global pg_container_name=stack-telemetry__pg] - -### - -[invoke setup_otel_collector] - -[invoke setup_pg] - -[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] - -[shell otel_collector] - # Verify that the Collector receives metrics from Electric with expected resource attributes - ??info ResourceMetrics - ??Resource attributes: - ?? -> custom.attr: Str(electric.val) - ?? -> instance.id: Str( - ?? -> name: Str(metrics) - ?? -> service.name: Str(electric) - ?? -> service.version: Str( - - # Verify that LSN metrics are exported - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.pg_wal_offset - -> Description: - -> Unit: - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_retained_wal_size - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ -[invoke start_psql] - -[shell psql] - !create table items (val text); - ??CREATE - -[shell client] - [invoke curl_shape "http://localhost:3000/v1/shape?table=items&offset=-1"] - - ??HTTP/1.1 200 OK - ?electric-handle: ([\d-]+) - [global handle=$1] - ?electric-offset: ([\w\d_]+) - [global offset=$1] - -[shell psql] - !insert into items values ('3'); - ??INSERT - -[shell client] - [invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=$offset&live"] - - ??HTTP/1.1 200 OK - ??"value":{"val":"3"} - -[shell otel_collector] - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.storage\.transaction_stored\.bytes - """ -### - -[cleanup] - [invoke teardown] diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 883af13cd0..4cd5bca788 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -76,6 +76,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do :garbage_collection, :reductions, :process_memory, + :process_bin_memory, :ets_memory, :get_system_load_average, :get_system_memory_usage @@ -87,6 +88,11 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ last_value("process.memory.total", tags: [:process_type], unit: :byte), + last_value("process.bin_memory.total", tags: [:process_type], unit: :byte), + last_value("process.bin_memory.max_bin_count", tags: [:process_type]), + last_value("process.bin_memory.avg_bin_count", tags: [:process_type]), + last_value("process.bin_memory.max_ref_count", tags: [:process_type]), + last_value("process.bin_memory.avg_ref_count", tags: [:process_type]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), @@ -172,9 +178,28 @@ defmodule ElectricTelemetry.ApplicationTelemetry do end def process_memory(%{intervals_and_thresholds: %{top_process_limit: process_limit}}) do - for %{type: type, memory: memory} <- - ElectricTelemetry.Processes.top_memory_by_type(process_limit) do - :telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)}) + for map <- ElectricTelemetry.Processes.top_memory_by_type(process_limit) do + :telemetry.execute( + [:process, :memory], + %{total: map.proc_mem}, + %{process_type: to_string(map.type)} + ) + end + end + + def process_bin_memory(%{intervals_and_thresholds: %{top_process_limit: process_limit}}) do + for map <- ElectricTelemetry.Processes.top_bin_memory_by_type(process_limit) do + :telemetry.execute( + [:process, :bin_memory], + %{ + total: map.binary_mem, + max_bin_count: map.max_bin_count, + avg_bin_count: map.avg_bin_count, + max_ref_count: map.max_ref_count, + avg_ref_count: map.avg_ref_count + }, + %{process_type: to_string(map.type)} + ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/opts.ex b/packages/electric-telemetry/lib/electric/telemetry/opts.ex index 189aea5053..7805d1d7fb 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/opts.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/opts.ex @@ -22,6 +22,7 @@ defmodule ElectricTelemetry.Opts do default: [], keys: [ system_metrics_poll_interval: [type: :integer, default: :timer.seconds(5)], + stack_telemetry_init_delay: [type: :integer, default: :timer.seconds(30)], top_process_limit: [ type: {:or, @@ -31,7 +32,8 @@ defmodule ElectricTelemetry.Opts do [ {:in, [:mem_percent]}, {:custom, ElectricTelemetry.Processes, :validate_mem_percent, []} - ]} + ]}, + {:tuple, [{:in, [:at_least_bytes]}, :non_neg_integer]} ]}, default: {:count, 5} ], diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index b34f87724b..ed07365dad 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -1,10 +1,13 @@ defmodule ElectricTelemetry.Processes do + @type limit :: + {:count, pos_integer()} | {:mem_percent, 1..100} | {:at_least_bytes, non_neg_integer()} + @default_count 5 + @default_limit {:count, @default_count} + # Minimum memory threshold for a process group when using :mem_percent mode. @min_group_memory 1024 * 1024 - @type limit :: {:count, pos_integer()} | {:mem_percent, 1..100} - defguardp is_valid_mem_percent(percent) when is_integer(percent) and percent >= 1 and percent <= 100 @@ -18,72 +21,122 @@ defmodule ElectricTelemetry.Processes do def proc_type(pid), do: proc_type(pid, info(pid)) - def top_memory_by_type do - top_memory_by_type(Process.list(), {:count, @default_count}) - end + def top_memory_by_type, do: top_by(:proc_mem) + def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit) + def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit) - def top_memory_by_type({_, _} = limit) do - top_memory_by_type(Process.list(), limit) - end + def top_bin_memory_by_type, do: top_by(:binary_mem) + def top_bin_memory_by_type({_, _} = limit), do: top_by(:binary_mem, limit) + def top_bin_memory_by_type(proc_list_or_limit), do: top_by(:binary_mem, proc_list_or_limit) + def top_bin_memory_by_type(proc_list, limit), do: top_by(:binary_mem, proc_list, limit) - def top_memory_by_type(process_list) when is_list(process_list) do - top_memory_by_type(process_list, {:count, @default_count}) - end + def top_by(sort_key), do: top_by(sort_key, Process.list(), @default_limit) + def top_by(sort_key, {_, _} = limit), do: top_by(sort_key, Process.list(), limit) + + def top_by(sort_key, proc_list) when is_list(proc_list), + do: top_by(sort_key, proc_list, @default_limit) - def top_memory_by_type(process_list, {:count, count}) - when is_list(process_list) and is_integer(count) and count > 0 do + defp top_by(sort_key, process_list, {:count, count}) + when is_integer(count) and count > 0 do process_list - |> sorted_groups() + |> sorted_groups(sort_key) |> Enum.take(count) end - def top_memory_by_type(process_list, {:mem_percent, percent}) - when is_list(process_list) and is_valid_mem_percent(percent) do + # When sorting by binary mem, processes double-count the same refc binary, so it doesn't + # make sense to talk about a "percentage of the total" in that case. + # Instead, for binary memory telemetry the low cutoff threshold should be provided. + defp top_by(:proc_mem, process_list, {:mem_percent, percent}) + when is_valid_mem_percent(percent) do # :processes_used excludes memory allocated but not yet used by process heaps, # giving a more accurate baseline for the percentage calculation. total_process_memory = :erlang.memory(:processes_used) target = total_process_memory * percent / 100 process_list - |> sorted_groups() - |> take_until_target(target) + |> sorted_groups(:proc_mem) + |> take_until_target(:proc_mem, target, @min_group_memory) + end + + defp top_by(sort_key, process_list, {:at_least_bytes, low_cutoff}) + when is_integer(low_cutoff) and low_cutoff >= 0 do + process_list + |> sorted_groups(sort_key) + |> take_until_target(sort_key, :infinity, low_cutoff) end - defp sorted_groups(process_list) do + defp sorted_groups(process_list, sort_key) do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) - |> Enum.group_by(& &1.type, & &1.memory) - |> Enum.map(fn {type, memory} -> %{type: type, memory: Enum.sum(memory)} end) - |> Enum.sort_by(&(-&1.memory)) + |> Enum.group_by(& &1.type) + |> Enum.map(fn {type, proc_infos} -> + proc_infos + |> mem_stats_for_procs() + |> Map.put(:type, type) + end) + |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end - defp take_until_target(proc_groups, target) do + defp take_until_target(proc_groups, sort_key, target, low_cutoff) do {_running_total, selected_groups} = Enum.reduce_while(proc_groups, {0, []}, fn _proc_group, {running_total, acc} when running_total >= target -> {:halt, {running_total, acc}} - proc_group, {running_total, acc} when proc_group.memory < @min_group_memory -> - # Include this last process group in the result so it's clear to the caller that the - # minimum group memory threshold has been reached earlier than the target total mem - # one. - {:halt, {running_total, [proc_group | acc]}} - proc_group, {running_total, acc} -> - {:cont, {running_total + proc_group.memory, [proc_group | acc]}} + value = Map.fetch!(proc_group, sort_key) + + if value < low_cutoff do + # Include this last process group in the result so it's clear to the caller that the + # low cutoff threshold has been reached earlier than the target total mem one. + {:halt, {running_total, [proc_group | acc]}} + else + {:cont, {running_total + value, [proc_group | acc]}} + end end) Enum.reverse(selected_groups) end + defp mem_stats_for_procs(proc_infos) when is_list(proc_infos) do + {proc_mem, binary_mem, max_ref_count, ref_count_sum, max_num_binaries, num_binaries, + num_procs} = + Enum.reduce(proc_infos, {0, 0, 0, 0, 0, 0, 0}, fn map, + {proc_mem, binary_mem, max_ref_count, + ref_count_sum, max_num_binaries, + num_binaries, num_procs} -> + { + proc_mem + map.proc_mem, + binary_mem + map.binary_mem, + max(max_ref_count, map.max_ref_count), + ref_count_sum + map.ref_count_sum, + max(max_num_binaries, map.num_binaries), + num_binaries + map.num_binaries, + num_procs + 1 + } + end) + + %{ + proc_mem: proc_mem, + binary_mem: binary_mem, + max_bin_count: max_num_binaries, + avg_bin_count: num_binaries / num_procs, + max_ref_count: max_ref_count, + avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries) + } + end + defp type_and_memory(pid) do info = info(pid) - %{type: proc_type(pid, info), memory: memory_from_info(info)} + + info + |> memory_from_info() + |> Map.put(:type, proc_type(pid, info)) end defp info(pid) do - Process.info(pid, [:dictionary, :initial_call, :label, :memory]) + Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary]) end defp proc_type(pid, info) do @@ -117,9 +170,36 @@ defmodule ElectricTelemetry.Processes do end defp memory_from_info(info) do - case info[:memory] do - bytes when is_integer(bytes) -> bytes - _ -> 0 + memory = + case info[:memory] do + bytes when is_integer(bytes) -> bytes + _ -> 0 + end + + case info[:binary] do + list when is_list(list) -> + {binary_mem, max_ref_count, ref_count_sum, num_entries} = + Enum.reduce(list, {0, 0, 0, 0}, fn {_reference, bin_size, bin_ref_count}, + {binary_mem, max_ref_count, ref_count_sum, + num_entries} -> + { + binary_mem + bin_size, + max(max_ref_count, bin_ref_count), + ref_count_sum + bin_ref_count, + num_entries + 1 + } + end) + + %{ + proc_mem: memory, + binary_mem: binary_mem, + max_ref_count: max_ref_count, + ref_count_sum: ref_count_sum, + num_binaries: num_entries + } + + _ -> + %{proc_mem: memory, binary_mem: 0, max_ref_count: 0, ref_count_sum: 0, num_binaries: 0} end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex index 3e2723b90b..fc209021ce 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex @@ -38,7 +38,7 @@ defmodule ElectricTelemetry.StackTelemetry do [ ElectricTelemetry.Poller.child_spec(opts, callback_module: __MODULE__, - init_delay: :timer.seconds(30) + init_delay: opts.intervals_and_thresholds.stack_telemetry_init_delay ) ], disk_usage_child_specs(opts), diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index c4e8a324e7..22462b5b51 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -1,7 +1,7 @@ defmodule ElectricTelemetry.ProcessesTest do use ExUnit.Case, async: true - import ElectricTelemetry.Processes, only: [proc_type: 1] + import ElectricTelemetry.Processes describe "proc_type/1 with binary labels" do test "groups request labels by method and path, stripping query and request id" do @@ -120,8 +120,6 @@ defmodule ElectricTelemetry.ProcessesTest do end describe "top_memory_by_type/[1, 2]" do - import ElectricTelemetry.Processes, only: [top_memory_by_type: 0, top_memory_by_type: 1] - test "handles dead processes" do parent = self() @@ -145,25 +143,33 @@ defmodule ElectricTelemetry.ProcessesTest do refute Process.alive?(pid1) - assert [%{memory: memory, type: :erlang}] = top_memory_by_type([pid1, pid2]) + assert [ + %{ + proc_mem: memory, + binary_mem: _, + avg_bin_count: _, + avg_ref_count: _, + type: :erlang + } + ] = top_memory_by_type([pid1, pid2]) assert is_integer(memory) end test "defaults to top 5 of all processes" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type() end test "allows for setting count limit" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type({:count, 2}) end @@ -178,11 +184,81 @@ defmodule ElectricTelemetry.ProcessesTest do assert length(results) >= 1 total_process_memory = :erlang.memory(:processes_used) - returned_memory = results |> Enum.map(& &1.memory) |> Enum.sum() + returned_memory = results |> Enum.map(& &1.proc_mem) |> Enum.sum() # Either we hit the 50% target or we ran out of groups above 1MiB assert returned_memory >= total_process_memory * 0.5 or - List.last(results).memory < 1024 * 1024 + List.last(results).proc_mem < 1024 * 1024 + end + end + + describe "top_bin_memory_by_type/[0, 1, 2]" do + test "defaults to top 5 sorted by binary_mem" do + results = top_bin_memory_by_type() + assert length(results) == 5 + binary_mems = Enum.map(results, & &1.binary_mem) + assert binary_mems == Enum.sort(binary_mems, :desc) + end + + test "sorts by binary_mem, not proc_mem" do + # Spawn a process with large binary memory but small heap + spawn_with_label(:big_binary, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(2 * 1024 * 1024)) + end) + + # Spawn a process with large heap but no binary memory + spawn_with_label(:big_heap, fn -> + Process.put(:list, Enum.to_list(1..200_000)) + end) + + proc_mem_results = top_memory_by_type({:count, 100}) + bin_mem_results = top_bin_memory_by_type({:count, 100}) + + # Each list is sorted by its own key + proc_mems = Enum.map(proc_mem_results, & &1.proc_mem) + assert proc_mems == Enum.sort(proc_mems, :desc) + + binary_mems = Enum.map(bin_mem_results, & &1.binary_mem) + assert binary_mems == Enum.sort(binary_mems, :desc) + + # The top entry in each list should be different + assert hd(proc_mem_results).type != hd(bin_mem_results).type + end + + test "at_least_bytes stops at the cutoff" do + spawn_with_label(:bin_large, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(2 * 1024 * 1024)) + end) + + spawn_with_label(:bin_small, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(100)) + end) + + results = top_bin_memory_by_type({:at_least_bytes, 1024 * 1024}) + + # The last entry should be the one that fell below the cutoff + last = List.last(results) + above_cutoff = Enum.drop(results, -1) + + assert Enum.all?(above_cutoff, &(&1.binary_mem >= 1024 * 1024)) + assert last.binary_mem < 1024 * 1024 + end + + test "at_least_bytes with process list" do + pid1 = + spawn_with_label(:with_bin, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(512 * 1024)) + end) + + pid2 = + spawn_with_label(:without_bin, fn -> + :ok + end) + + results = top_bin_memory_by_type([pid1, pid2], {:at_least_bytes, 1024}) + + types = Enum.map(results, & &1.type) + assert :with_bin in types end end diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 892b55cafd..053434cfdf 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -177,6 +177,13 @@ otel_export_period = nil ) +stack_telemetry_init_delay = + env!( + "ELECTRIC_STACK_TELEMETRY_INIT_DELAY", + &Electric.Config.parse_human_readable_time!/1, + nil + ) + # The provided database id is relevant if you had used v0.8 and want to keep the storage # instead of having hanging files. We use a provided value as stack id, but nothing else. provided_database_id = env!("ELECTRIC_DATABASE_ID", :string, nil) @@ -214,6 +221,7 @@ config :electric, call_home_telemetry?: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod), telemetry_url: call_home_telemetry_url, system_metrics_poll_interval: system_metrics_poll_interval, + stack_telemetry_init_delay: stack_telemetry_init_delay, otel_export_period: otel_export_period, otel_sampling_ratio: env!("ELECTRIC_OTEL_SAMPLING_RATIO", :float, nil), metrics_sampling_ratio: env!("ELECTRIC_METRICS_SAMPLING_RATIO", :float, nil), diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 2d22006f25..9a613c8893 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -381,6 +381,7 @@ defmodule Electric.Application do intervals_and_thresholds: get_opts(opts, system_metrics_poll_interval: :system_metrics_poll_interval, + stack_telemetry_init_delay: :stack_telemetry_init_delay, long_gc_threshold: :telemetry_long_gc_threshold, long_schedule_threshold: :telemetry_long_schedule_threshold, long_message_queue_enable_threshold: :telemetry_long_message_queue_enable_threshold, diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 0b5c4fdeb5..f163f1f7c4 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -154,10 +154,21 @@ defmodule Electric.StackSupervisor.Telemetry do if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do def child_spec(config) when is_map(config) do + otel_opts_base = Keyword.get(config.telemetry_opts, :otel_opts, []) + existing_resource = Keyword.get(otel_opts_base, :resource, %{}) + + otel_opts = + Keyword.put( + otel_opts_base, + :resource, + Map.put_new(existing_resource, :stack_id, config.stack_id) + ) + telemetry_opts = config.telemetry_opts |> Keyword.put(:stack_id, config.stack_id) |> Keyword.put(:storage_dir, config.storage_dir) + |> Keyword.put(:otel_opts, otel_opts) # Always enable default periodic measurements in addition to the user-provided ones |> Keyword.update( :periodic_measurements,