From 9319bd36cfd3f5c6d77fb674dfce80c75d57e57e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 13:16:40 +0100 Subject: [PATCH 01/22] Verify that stack telemetry includes stack_id in the OTEL resource --- integration-tests/tests/stack-telemetry.lux | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/integration-tests/tests/stack-telemetry.lux b/integration-tests/tests/stack-telemetry.lux index 18903a96e9..8abc93a567 100644 --- a/integration-tests/tests/stack-telemetry.lux +++ b/integration-tests/tests/stack-telemetry.lux @@ -14,13 +14,17 @@ [shell otel_collector] # Verify that the Collector receives metrics from Electric with expected resource attributes - ??info ResourceMetrics - ??Resource attributes: - ?? -> custom.attr: Str(electric.val) - ?? -> instance.id: Str( - ?? -> name: Str(metrics) - ?? -> service.name: Str(electric) - ?? -> service.version: Str( + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + -> stack_id: Str\(single_stack\) + """ # Verify that LSN metrics are exported """? From cba11fc2a37e04ede36eddc7952547f42eb413f5 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 13:17:44 +0100 Subject: [PATCH 02/22] Include stack_id in the OTEL resource for stack telemetry --- .../sync-service/lib/electric/stack_supervisor/telemetry.ex | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 0b5c4fdeb5..1ba51236b8 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -154,10 +154,15 @@ defmodule Electric.StackSupervisor.Telemetry do if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do def child_spec(config) when is_map(config) do + otel_opts = + Keyword.get(config.telemetry_opts, :otel_opts, []) + |> Keyword.put_new(:resource, %{stack_id: config.stack_id}) + telemetry_opts = config.telemetry_opts |> Keyword.put(:stack_id, config.stack_id) |> Keyword.put(:storage_dir, config.storage_dir) + |> Keyword.put(:otel_opts, otel_opts) # Always enable default periodic measurements in addition to the user-provided ones |> Keyword.update( :periodic_measurements, From fe08f651a0c2a4ebdbe93047ddeef30c48f3bb8f Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Wed, 25 Mar 2026 17:37:50 +0100 Subject: [PATCH 03/22] Merge stack_id into existing OTEL resource instead of using put_new put_new would silently drop stack_id when the user provides a custom :resource map. Use Map.put_new on the existing resource map instead, so stack_id is always included without overwriting user-provided keys. Co-Authored-By: Claude Opus 4.6 --- .../sync-service/lib/electric/stack_supervisor/telemetry.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 1ba51236b8..f1a35bcc11 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -154,9 +154,11 @@ defmodule Electric.StackSupervisor.Telemetry do if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do def child_spec(config) when is_map(config) do + otel_opts_base = Keyword.get(config.telemetry_opts, :otel_opts, []) + existing_resource = Keyword.get(otel_opts_base, :resource, %{}) + otel_opts = - Keyword.get(config.telemetry_opts, :otel_opts, []) - |> Keyword.put_new(:resource, %{stack_id: config.stack_id}) + Keyword.put(otel_opts_base, :resource, Map.put_new(existing_resource, :stack_id, config.stack_id)) telemetry_opts = config.telemetry_opts From cda479508ccfbda169e8504639600e8178add740 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 13:18:09 +0100 Subject: [PATCH 04/22] Include binary_mem and average ref count in top process metrics --- .../telemetry/application_telemetry.ex | 8 ++- .../lib/electric/telemetry/processes.ex | 60 ++++++++++++++++--- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 883af13cd0..4ddcdac477 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -172,9 +172,13 @@ defmodule ElectricTelemetry.ApplicationTelemetry do end def process_memory(%{intervals_and_thresholds: %{top_process_limit: process_limit}}) do - for %{type: type, memory: memory} <- + for %{type: type, proc_mem: proc_mem, binary_mem: binary_mem, avg_ref_count: avg_ref_count} <- ElectricTelemetry.Processes.top_memory_by_type(process_limit) do - :telemetry.execute([:process, :memory], %{total: memory}, %{process_type: to_string(type)}) + :telemetry.execute( + [:process, :memory], + %{total: proc_mem, binary: binary_mem, avg_ref_count: avg_ref_count}, + %{process_type: to_string(type)} + ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index b34f87724b..add0f3e57e 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -53,9 +53,13 @@ defmodule ElectricTelemetry.Processes do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) - |> Enum.group_by(& &1.type, & &1.memory) - |> Enum.map(fn {type, memory} -> %{type: type, memory: Enum.sum(memory)} end) - |> Enum.sort_by(&(-&1.memory)) + |> Enum.group_by(& &1.type) + |> Enum.map(fn {type, proc_infos} -> + proc_infos + |> mem_stats_for_procs() + |> Map.put(:type, type) + end) + |> Enum.sort_by(&(-&1.proc_mem)) end defp take_until_target(proc_groups, target) do @@ -71,19 +75,37 @@ defmodule ElectricTelemetry.Processes do {:halt, {running_total, [proc_group | acc]}} proc_group, {running_total, acc} -> - {:cont, {running_total + proc_group.memory, [proc_group | acc]}} + {:cont, {running_total + proc_group.proc_mem, [proc_group | acc]}} end) Enum.reverse(selected_groups) end + defp mem_stats_for_procs(proc_infos) when is_list(proc_infos) do + {proc_mem, binary_mem, ref_count_sum, num_binaries} = + Enum.reduce(proc_infos, {0, 0, 0, 0}, fn map, + {proc_mem, binary_mem, ref_count_sum, num_binaries} -> + {proc_mem + map.proc_mem, binary_mem + map.binary_mem, ref_count_sum + map.ref_count_sum, + num_binaries + map.num_binaries} + end) + + %{ + proc_mem: proc_mem, + binary_mem: binary_mem, + avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries) + } + end + defp type_and_memory(pid) do info = info(pid) - %{type: proc_type(pid, info), memory: memory_from_info(info)} + + info + |> memory_from_info() + |> Map.put(:type, proc_type(pid, info)) end defp info(pid) do - Process.info(pid, [:dictionary, :initial_call, :label, :memory]) + Process.info(pid, [:dictionary, :initial_call, :label, :memory, :binary]) end defp proc_type(pid, info) do @@ -117,9 +139,29 @@ defmodule ElectricTelemetry.Processes do end defp memory_from_info(info) do - case info[:memory] do - bytes when is_integer(bytes) -> bytes - _ -> 0 + memory = + case info[:memory] do + bytes when is_integer(bytes) -> bytes + _ -> 0 + end + + case info[:binary] do + list when is_list(list) -> + {binary_mem, {ref_sum, num_entries}} = + Enum.reduce(list, {0, {0, 0}}, fn {_reference, size, ref_count}, + {total_size, {ref_sum, num_entries}} -> + {total_size + size, {ref_sum + ref_count, num_entries + 1}} + end) + + %{ + proc_mem: memory, + binary_mem: binary_mem, + ref_count_sum: ref_sum, + num_binaries: num_entries + } + + _ -> + %{proc_mem: memory} end end From 26a1859b7c15cbd55845b411113f6b3d43f456cc Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 13:22:32 +0100 Subject: [PATCH 05/22] Add application metrics to the otel telemetry integration test --- integration-tests/tests/otel-export.lux | 185 ++++++++++++++++++ integration-tests/tests/stack-telemetry.lux | 102 ---------- .../telemetry/application_telemetry.ex | 2 + 3 files changed, 187 insertions(+), 102 deletions(-) create mode 100644 integration-tests/tests/otel-export.lux delete mode 100644 integration-tests/tests/stack-telemetry.lux diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux new file mode 100644 index 0000000000..b42d958935 --- /dev/null +++ b/integration-tests/tests/otel-export.lux @@ -0,0 +1,185 @@ +[doc Verify that application and stack metrics are correctly exported via Otel] + +[include _macros.luxinc] + +[global pg_container_name=otel-export__pg] + +### + +[invoke setup_otel_collector] + +[invoke setup_pg] + +[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] + +# Spawn a process containing off-heap binary references and ensure its in the top 5 by memory footprint. +# Otel Collector sorts metrics in its debug output, so the process label for our process needs +# to be first lexicographically for easier output matching further down. +[shell electric] + """! + _pid = spawn_link(fn -> + Process.set_label(:A_memory_hog) + + on_heap_strings = + Enum.map(1..1_000_000, fn _ -> String.duplicate("on heap", :random.uniform(8)) end) + + off_heap_strings = + Enum.map(1..100_000, fn i -> String.duplicate("1234567890", 70 * i) end) + + receive do + pid -> send(pid, {on_heap_strings, off_heap_strings}) + end + end) + """ + + ??#PID + +[shell otel_collector] + # Verify that the Collector receives expected application metrics from Electric + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + ScopeMetrics #0 + """ + + # Verify the presence of process.memory.* metrics + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.avg_ref_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: 1\.000000 + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.binary + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.total + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + # Verify that the Collector receives stack metrics from Electric with expected resource attributes + """? + info ResourceMetrics #0 + Resource SchemaURL: + Resource attributes: + -> custom.attr: Str\(electric.val\) + -> instance.id: Str\([-a-f0-9]+\) + -> name: Str\(metrics\) + -> service.name: Str\(electric\) + -> service.version: Str\([0-9.]+\) + -> stack_id: Str\(single_stack\) + ScopeMetrics #0 + """ + + # Verify that LSN metrics are exported + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.pg_wal_offset + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.postgres\.replication\.slot_retained_wal_size + -> Description: + -> Unit: By + -> DataType: Gauge + NumberDataPoints #0 + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [-+.0-9]+ + """ +[invoke start_psql] + +[shell psql] + !create table items (val text); + ??CREATE + +[shell client] + [invoke curl_shape "http://localhost:3000/v1/shape?table=items&offset=-1"] + + ??HTTP/1.1 200 OK + ?electric-handle: ([\d-]+) + [global handle=$1] + ?electric-offset: ([\w\d_]+) + [global offset=$1] + +[shell psql] + !insert into items values ('3'); + ??INSERT + +[shell client] + [invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=$offset&live"] + + ??HTTP/1.1 200 OK + ??"value":{"val":"3"} + +[shell otel_collector] + """? + Metric #[0-9]+ + Descriptor: + -> Name: electric\.storage\.transaction_stored\.bytes + """ + +### + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/stack-telemetry.lux b/integration-tests/tests/stack-telemetry.lux deleted file mode 100644 index 8abc93a567..0000000000 --- a/integration-tests/tests/stack-telemetry.lux +++ /dev/null @@ -1,102 +0,0 @@ -[doc Verify that all of the expected stack metrics are exported via Otel] - -[include _macros.luxinc] - -[global pg_container_name=stack-telemetry__pg] - -### - -[invoke setup_otel_collector] - -[invoke setup_pg] - -[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] - -[shell otel_collector] - # Verify that the Collector receives metrics from Electric with expected resource attributes - """? - info ResourceMetrics #0 - Resource SchemaURL: - Resource attributes: - -> custom.attr: Str\(electric.val\) - -> instance.id: Str\([-a-f0-9]+\) - -> name: Str\(metrics\) - -> service.name: Str\(electric\) - -> service.version: Str\([0-9.]+\) - -> stack_id: Str\(single_stack\) - """ - - # Verify that LSN metrics are exported - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.pg_wal_offset - -> Description: - -> Unit: - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_confirmed_flush_lsn_lag - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ - - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.postgres\.replication\.slot_retained_wal_size - -> Description: - -> Unit: By - -> DataType: Gauge - NumberDataPoints #0 - StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC - Value: [-+.0-9]+ - """ -[invoke start_psql] - -[shell psql] - !create table items (val text); - ??CREATE - -[shell client] - [invoke curl_shape "http://localhost:3000/v1/shape?table=items&offset=-1"] - - ??HTTP/1.1 200 OK - ?electric-handle: ([\d-]+) - [global handle=$1] - ?electric-offset: ([\w\d_]+) - [global offset=$1] - -[shell psql] - !insert into items values ('3'); - ??INSERT - -[shell client] - [invoke curl_shape "http://localhost:3000/v1/shape?table=items&handle=$handle&offset=$offset&live"] - - ??HTTP/1.1 200 OK - ??"value":{"val":"3"} - -[shell otel_collector] - """? - Metric #[0-9]+ - Descriptor: - -> Name: electric\.storage\.transaction_stored\.bytes - """ -### - -[cleanup] - [invoke teardown] diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 4ddcdac477..3630827ba0 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -87,6 +87,8 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ last_value("process.memory.total", tags: [:process_type], unit: :byte), + last_value("process.memory.binary", tags: [:process_type], unit: :byte), + last_value("process.memory.avg_ref_count", tags: [:process_type]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), From 31e1048d4679a4a58506f611f0d84e24cc8611e9 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 17:18:28 +0200 Subject: [PATCH 06/22] Reduce mem usage in the integration test --- integration-tests/tests/otel-export.lux | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux index b42d958935..f99ddcd8c4 100644 --- a/integration-tests/tests/otel-export.lux +++ b/integration-tests/tests/otel-export.lux @@ -21,10 +21,10 @@ Process.set_label(:A_memory_hog) on_heap_strings = - Enum.map(1..1_000_000, fn _ -> String.duplicate("on heap", :random.uniform(8)) end) + Enum.map(1..4000, fn _ -> String.duplicate("on heap", 4000) end) off_heap_strings = - Enum.map(1..100_000, fn i -> String.duplicate("1234567890", 70 * i) end) + Enum.map(1..10, fn i -> String.duplicate("1234567890", 7000) end) receive do pid -> send(pid, {on_heap_strings, off_heap_strings}) From db1659a1152101bf5058805a69e459de06e6c183 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 17:42:27 +0200 Subject: [PATCH 07/22] Fix lingering timeout problem in otel_collector's e2e tests --- integration-tests/tests/_macros.luxinc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integration-tests/tests/_macros.luxinc b/integration-tests/tests/_macros.luxinc index ae34abbcca..54a0bbc0f8 100644 --- a/integration-tests/tests/_macros.luxinc +++ b/integration-tests/tests/_macros.luxinc @@ -231,11 +231,16 @@ -p 4318:4318 \ -v $(realpath ../support_files/otel-collector-config.yaml):/conf/otel-collector-config.yaml \ otel/opentelemetry-collector-contrib --config=/conf/otel-collector-config.yaml + # Allow time for docker to pull the image if not cached [timeout 120] + ??Starting HTTP server ?"endpoint": "(0\.0\.0\.0|\[::\]):4318" ??Everything is ready. Begin running and processing data. + + # Reset the timeout for subsequent pattern matching + [timeout 10] [endmacro] [macro teardown_container container_name] From 2e05371152cd2d6e19e126931c5462f97215d81b Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 15 Dec 2025 13:40:56 +0100 Subject: [PATCH 08/22] Cache lux on CI --- .github/workflows/integration_tests.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 6b0a63bc26..692af0e35d 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -28,6 +28,7 @@ jobs: - uses: actions/checkout@v4 - uses: erlef/setup-beam@v1 + id: setup_beam with: version-type: strict version-file: '.tool-versions' @@ -62,6 +63,15 @@ jobs: run: mix compile working-directory: packages/sync-service + - name: Cache lux + uses: actions/cache@v4 + with: + path: integration-tests/lux + key: '${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }}' + restore-keys: | + ${{ runner.os }}-lux-${{ steps.setup_beam.outputs.otp-version }} + ${{ runner.os }}-lux + - name: Setup lux run: make From 01429a35fc5383fce77814ab608d0e3b56dbb946 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 14:06:44 +0100 Subject: [PATCH 09/22] Include average number of binaries in top memory-heavy process metrics --- integration-tests/tests/otel-export.lux | 15 +++++++++++++++ .../electric/telemetry/application_telemetry.ex | 13 +++++++++---- .../lib/electric/telemetry/processes.ex | 17 ++++++++++++----- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux index f99ddcd8c4..188dbca465 100644 --- a/integration-tests/tests/otel-export.lux +++ b/integration-tests/tests/otel-export.lux @@ -49,6 +49,21 @@ """ # Verify the presence of process.memory.* metrics + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.memory\.avg_bin_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [.0-9]+ + """ + """? Metric #[0-9]+ Descriptor: diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 3630827ba0..5d19983273 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -88,6 +88,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do [ last_value("process.memory.total", tags: [:process_type], unit: :byte), last_value("process.memory.binary", tags: [:process_type], unit: :byte), + last_value("process.memory.avg_bin_count", tags: [:process_type]), last_value("process.memory.avg_ref_count", tags: [:process_type]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), @@ -174,12 +175,16 @@ defmodule ElectricTelemetry.ApplicationTelemetry do end def process_memory(%{intervals_and_thresholds: %{top_process_limit: process_limit}}) do - for %{type: type, proc_mem: proc_mem, binary_mem: binary_mem, avg_ref_count: avg_ref_count} <- - ElectricTelemetry.Processes.top_memory_by_type(process_limit) do + for map <- ElectricTelemetry.Processes.top_memory_by_type(process_limit) do :telemetry.execute( [:process, :memory], - %{total: proc_mem, binary: binary_mem, avg_ref_count: avg_ref_count}, - %{process_type: to_string(type)} + %{ + total: map.proc_mem, + binary: map.binary_mem, + avg_bin_count: map.avg_bin_count, + avg_ref_count: map.avg_ref_count + }, + %{process_type: to_string(map.type)} ) end end diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index add0f3e57e..612c847d4b 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -82,16 +82,23 @@ defmodule ElectricTelemetry.Processes do end defp mem_stats_for_procs(proc_infos) when is_list(proc_infos) do - {proc_mem, binary_mem, ref_count_sum, num_binaries} = - Enum.reduce(proc_infos, {0, 0, 0, 0}, fn map, - {proc_mem, binary_mem, ref_count_sum, num_binaries} -> - {proc_mem + map.proc_mem, binary_mem + map.binary_mem, ref_count_sum + map.ref_count_sum, - num_binaries + map.num_binaries} + {proc_mem, binary_mem, ref_count_sum, num_binaries, num_procs} = + Enum.reduce(proc_infos, {0, 0, 0, 0, 0}, fn map, + {proc_mem, binary_mem, ref_count_sum, + num_binaries, num_procs} -> + { + proc_mem + map.proc_mem, + binary_mem + map.binary_mem, + ref_count_sum + map.ref_count_sum, + num_binaries + map.num_binaries, + num_procs + 1 + } end) %{ proc_mem: proc_mem, binary_mem: binary_mem, + avg_bin_count: num_binaries / num_procs, avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries) } end From 931beeae43771a79ccfe1814aa142f4c7df0bd90 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 14:09:07 +0100 Subject: [PATCH 10/22] Fix electric-telemetry unit tests --- .../electric/telemetry/processes_test.exs | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index c4e8a324e7..dbd8a6ae8c 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -145,25 +145,33 @@ defmodule ElectricTelemetry.ProcessesTest do refute Process.alive?(pid1) - assert [%{memory: memory, type: :erlang}] = top_memory_by_type([pid1, pid2]) + assert [ + %{ + proc_mem: memory, + binary_mem: _, + avg_bin_count: _, + avg_ref_count: _, + type: :erlang + } + ] = top_memory_by_type([pid1, pid2]) assert is_integer(memory) end test "defaults to top 5 of all processes" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type() end test "allows for setting count limit" do assert [ - %{memory: _, type: _}, - %{memory: _, type: _} + %{proc_mem: _, type: _}, + %{proc_mem: _, type: _} ] = top_memory_by_type({:count, 2}) end @@ -178,11 +186,11 @@ defmodule ElectricTelemetry.ProcessesTest do assert length(results) >= 1 total_process_memory = :erlang.memory(:processes_used) - returned_memory = results |> Enum.map(& &1.memory) |> Enum.sum() + returned_memory = results |> Enum.map(& &1.proc_mem) |> Enum.sum() # Either we hit the 50% target or we ran out of groups above 1MiB assert returned_memory >= total_process_memory * 0.5 or - List.last(results).memory < 1024 * 1024 + List.last(results).proc_mem < 1024 * 1024 end end From 0faba58997116d3e1a702a2c2bdada16046b68cc Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 17 Dec 2025 14:22:31 +0100 Subject: [PATCH 11/22] Add changesets --- .changeset/eleven-nails-argue.md | 5 +++++ .changeset/forty-pillows-laugh.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 .changeset/eleven-nails-argue.md create mode 100644 .changeset/forty-pillows-laugh.md diff --git a/.changeset/eleven-nails-argue.md b/.changeset/eleven-nails-argue.md new file mode 100644 index 0000000000..147520356d --- /dev/null +++ b/.changeset/eleven-nails-argue.md @@ -0,0 +1,5 @@ +--- +'@core/electric-telemetry': patch +--- + +Add binary memory, average number of off-heap binaries and their ref counts to top processes by memory metric. diff --git a/.changeset/forty-pillows-laugh.md b/.changeset/forty-pillows-laugh.md new file mode 100644 index 0000000000..1ebb0e06a6 --- /dev/null +++ b/.changeset/forty-pillows-laugh.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Include stack_id in otel opts for stack metrics. It had been omitted by mistake before. From 14360920247f09ab0564cdaf888673e957c6eaaa Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Fri, 20 Mar 2026 16:42:03 +0100 Subject: [PATCH 12/22] Fix incomplete map in memory_from_info fallback case The fallback branch when info[:binary] is not a list was returning a map with only proc_mem, but the reduce function expects binary_mem, ref_count_sum, and num_binaries keys too. This would cause a KeyError if the branch were ever reached. Co-Authored-By: Claude Opus 4.6 --- packages/electric-telemetry/lib/electric/telemetry/processes.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 612c847d4b..668b1d7d34 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -168,7 +168,7 @@ defmodule ElectricTelemetry.Processes do } _ -> - %{proc_mem: memory} + %{proc_mem: memory, binary_mem: 0, ref_count_sum: 0, num_binaries: 0} end end From 00de98cc4969aebbe09ee680e0631dc0be2933fd Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 15:17:09 +0200 Subject: [PATCH 13/22] mix format --- .../sync-service/lib/electric/stack_supervisor/telemetry.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index f1a35bcc11..f163f1f7c4 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -158,7 +158,11 @@ defmodule Electric.StackSupervisor.Telemetry do existing_resource = Keyword.get(otel_opts_base, :resource, %{}) otel_opts = - Keyword.put(otel_opts_base, :resource, Map.put_new(existing_resource, :stack_id, config.stack_id)) + Keyword.put( + otel_opts_base, + :resource, + Map.put_new(existing_resource, :stack_id, config.stack_id) + ) telemetry_opts = config.telemetry_opts From 82f9df5700fcad4202d2214f94b3f9ea06991fbb Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:11:45 +0200 Subject: [PATCH 14/22] Include max_bin_count and max_ref_count for each process group --- .../telemetry/application_telemetry.ex | 2 ++ .../lib/electric/telemetry/processes.ex | 33 +++++++++++++------ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index 5d19983273..f63f9b7d08 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -181,7 +181,9 @@ defmodule ElectricTelemetry.ApplicationTelemetry do %{ total: map.proc_mem, binary: map.binary_mem, + max_bin_count: map.max_bin_count, avg_bin_count: map.avg_bin_count, + max_ref_count: map.max_ref_count, avg_ref_count: map.avg_ref_count }, %{process_type: to_string(map.type)} diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 668b1d7d34..1a4f5b31cc 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -82,14 +82,18 @@ defmodule ElectricTelemetry.Processes do end defp mem_stats_for_procs(proc_infos) when is_list(proc_infos) do - {proc_mem, binary_mem, ref_count_sum, num_binaries, num_procs} = - Enum.reduce(proc_infos, {0, 0, 0, 0, 0}, fn map, - {proc_mem, binary_mem, ref_count_sum, - num_binaries, num_procs} -> + {proc_mem, binary_mem, max_ref_count, ref_count_sum, max_num_binaries, num_binaries, + num_procs} = + Enum.reduce(proc_infos, {0, 0, 0, 0, 0, 0, 0}, fn map, + {proc_mem, binary_mem, max_ref_count, + ref_count_sum, max_num_binaries, + num_binaries, num_procs} -> { proc_mem + map.proc_mem, binary_mem + map.binary_mem, + max(max_ref_count, map.max_ref_count), ref_count_sum + map.ref_count_sum, + max(max_num_binaries, map.num_binaries), num_binaries + map.num_binaries, num_procs + 1 } @@ -98,7 +102,9 @@ defmodule ElectricTelemetry.Processes do %{ proc_mem: proc_mem, binary_mem: binary_mem, + max_bin_count: max_num_binaries, avg_bin_count: num_binaries / num_procs, + max_ref_count: max_ref_count, avg_ref_count: if(num_binaries == 0, do: 0, else: ref_count_sum / num_binaries) } end @@ -154,21 +160,28 @@ defmodule ElectricTelemetry.Processes do case info[:binary] do list when is_list(list) -> - {binary_mem, {ref_sum, num_entries}} = - Enum.reduce(list, {0, {0, 0}}, fn {_reference, size, ref_count}, - {total_size, {ref_sum, num_entries}} -> - {total_size + size, {ref_sum + ref_count, num_entries + 1}} + {binary_mem, max_ref_count, ref_count_sum, num_entries} = + Enum.reduce(list, {0, 0, 0, 0}, fn {_reference, bin_size, bin_ref_count}, + {binary_mem, max_ref_count, ref_count_sum, + num_entries} -> + { + binary_mem + bin_size, + max(max_ref_count, bin_ref_count), + ref_count_sum + bin_ref_count, + num_entries + 1 + } end) %{ proc_mem: memory, binary_mem: binary_mem, - ref_count_sum: ref_sum, + max_ref_count: max_ref_count, + ref_count_sum: ref_count_sum, num_binaries: num_entries } _ -> - %{proc_mem: memory, binary_mem: 0, ref_count_sum: 0, num_binaries: 0} + %{proc_mem: memory, binary_mem: 0, max_ref_count: 0, ref_count_sum: 0, num_binaries: 0} end end From ed3de5c2e337a0467499ae14cbb0b4e8da71fe91 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:24:04 +0200 Subject: [PATCH 15/22] Split process.memory into process.memory and process.bin_memory metrics process.memory.* now only exports total, sorted by aggregated process memory. The new process.bin_memory.* metric sorts process groups by referenced binary memory and exports: total, max_bin_count, avg_bin_count, max_ref_count, avg_ref_count. Co-Authored-By: Claude Opus 4.6 (1M context) Extract shared top_by/3 to deduplicate top_memory and top_bin_memory Co-Authored-By: Claude Opus 4.6 (1M context) wip --- integration-tests/tests/otel-export.lux | 40 ++++++++++++++-- .../telemetry/application_telemetry.ex | 22 +++++++-- .../lib/electric/telemetry/processes.ex | 46 +++++++++++-------- 3 files changed, 80 insertions(+), 28 deletions(-) diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux index 188dbca465..9c6f6671c3 100644 --- a/integration-tests/tests/otel-export.lux +++ b/integration-tests/tests/otel-export.lux @@ -48,11 +48,11 @@ ScopeMetrics #0 """ - # Verify the presence of process.memory.* metrics + # Verify the presence of process.bin_memory.* metrics """? Metric #[0-9]+ Descriptor: - -> Name: process\.memory\.avg_bin_count + -> Name: process\.bin_memory\.avg_bin_count -> Description: -> Unit: -> DataType: Gauge @@ -67,7 +67,7 @@ """? Metric #[0-9]+ Descriptor: - -> Name: process\.memory\.avg_ref_count + -> Name: process\.bin_memory\.avg_ref_count -> Description: -> Unit: -> DataType: Gauge @@ -82,7 +82,37 @@ """? Metric #[0-9]+ Descriptor: - -> Name: process\.memory\.binary + -> Name: process\.bin_memory\.max_bin_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.max_ref_count + -> Description: + -> Unit: + -> DataType: Gauge + NumberDataPoints #0 + Data point attributes: + -> process_type: Str\(A_memory_hog\) + StartTimestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC + Value: [0-9]+ + """ + + """? + Metric #[0-9]+ + Descriptor: + -> Name: process\.bin_memory\.total -> Description: -> Unit: By -> DataType: Gauge @@ -94,6 +124,7 @@ Value: [0-9]+ """ + # Verify the presence of process.memory.total metric """? Metric #[0-9]+ Descriptor: @@ -162,6 +193,7 @@ Timestamp: [-0-9]+ [.:0-9]+ [-+0-9]+ UTC Value: [-+.0-9]+ """ + [invoke start_psql] [shell psql] diff --git a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex index f63f9b7d08..4cd5bca788 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/application_telemetry.ex @@ -76,6 +76,7 @@ defmodule ElectricTelemetry.ApplicationTelemetry do :garbage_collection, :reductions, :process_memory, + :process_bin_memory, :ets_memory, :get_system_load_average, :get_system_memory_usage @@ -87,9 +88,11 @@ defmodule ElectricTelemetry.ApplicationTelemetry do def metrics(telemetry_opts) do [ last_value("process.memory.total", tags: [:process_type], unit: :byte), - last_value("process.memory.binary", tags: [:process_type], unit: :byte), - last_value("process.memory.avg_bin_count", tags: [:process_type]), - last_value("process.memory.avg_ref_count", tags: [:process_type]), + last_value("process.bin_memory.total", tags: [:process_type], unit: :byte), + last_value("process.bin_memory.max_bin_count", tags: [:process_type]), + last_value("process.bin_memory.avg_bin_count", tags: [:process_type]), + last_value("process.bin_memory.max_ref_count", tags: [:process_type]), + last_value("process.bin_memory.avg_ref_count", tags: [:process_type]), last_value("ets.memory.total", tags: [:table_type], unit: :byte), last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), @@ -178,9 +181,18 @@ defmodule ElectricTelemetry.ApplicationTelemetry do for map <- ElectricTelemetry.Processes.top_memory_by_type(process_limit) do :telemetry.execute( [:process, :memory], + %{total: map.proc_mem}, + %{process_type: to_string(map.type)} + ) + end + end + + def process_bin_memory(%{intervals_and_thresholds: %{top_process_limit: process_limit}}) do + for map <- ElectricTelemetry.Processes.top_bin_memory_by_type(process_limit) do + :telemetry.execute( + [:process, :bin_memory], %{ - total: map.proc_mem, - binary: map.binary_mem, + total: map.binary_mem, max_bin_count: map.max_bin_count, avg_bin_count: map.avg_bin_count, max_ref_count: map.max_ref_count, diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 1a4f5b31cc..7905533439 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -1,10 +1,12 @@ defmodule ElectricTelemetry.Processes do + @type limit :: {:count, pos_integer()} | {:mem_percent, 1..100} + @default_count 5 + @default_limit {:count, @default_count} + # Minimum memory threshold for a process group when using :mem_percent mode. @min_group_memory 1024 * 1024 - @type limit :: {:count, pos_integer()} | {:mem_percent, 1..100} - defguardp is_valid_mem_percent(percent) when is_integer(percent) and percent >= 1 and percent <= 100 @@ -18,38 +20,44 @@ defmodule ElectricTelemetry.Processes do def proc_type(pid), do: proc_type(pid, info(pid)) - def top_memory_by_type do - top_memory_by_type(Process.list(), {:count, @default_count}) - end + def top_memory_by_type, do: top_by(:proc_mem) + def top_memory_by_type(proc_list_or_limit), do: top_by(:proc_mem, proc_list_or_limit) + def top_memory_by_type(proc_list, limit), do: top_by(:proc_mem, proc_list, limit) - def top_memory_by_type({_, _} = limit) do - top_memory_by_type(Process.list(), limit) - end + def top_bin_memory_by_type, do: top_by(:binary_mem) + def top_bin_memory_by_type({_, _} = limit), do: top_by(:binary_mem, limit) + def top_bin_memory_by_type(proc_list_or_limit), do: top_by(:binary_mem, proc_list_or_limit) + def top_bin_memory_by_type(proc_list, limit), do: top_by(:binary_mem, proc_list, limit) - def top_memory_by_type(process_list) when is_list(process_list) do - top_memory_by_type(process_list, {:count, @default_count}) - end + def top_by(sort_key), do: top_by(sort_key, Process.list(), @default_limit) + def top_by(sort_key, {_, _} = limit), do: top_by(sort_key, Process.list(), limit) + + def top_by(sort_key, proc_list) when is_list(proc_list), + do: top_by(sort_key, proc_list, @default_limit) - def top_memory_by_type(process_list, {:count, count}) - when is_list(process_list) and is_integer(count) and count > 0 do + defp top_by(sort_key, process_list, {:count, count}) + when is_integer(count) and count > 0 do process_list - |> sorted_groups() + |> sorted_groups(sort_key) |> Enum.take(count) end - def top_memory_by_type(process_list, {:mem_percent, percent}) - when is_list(process_list) and is_valid_mem_percent(percent) do + # When sortying by binary mem, processes double-count the same refc binary, so it doesn't + # make sense to talk about a "percentage of the total" in that case. + # Instead, for binary memory telemetry the low cutoff threshold should be provided. + defp top_by(:proc_mem, process_list, {:mem_percent, percent}) + when is_valid_mem_percent(percent) do # :processes_used excludes memory allocated but not yet used by process heaps, # giving a more accurate baseline for the percentage calculation. total_process_memory = :erlang.memory(:processes_used) target = total_process_memory * percent / 100 process_list - |> sorted_groups() + |> sorted_groups(:proc_mem) |> take_until_target(target) end - defp sorted_groups(process_list) do + defp sorted_groups(process_list, sort_key) do process_list |> Enum.map(&type_and_memory/1) |> Enum.reject(&(&1.type == :dead)) @@ -59,7 +67,7 @@ defmodule ElectricTelemetry.Processes do |> mem_stats_for_procs() |> Map.put(:type, type) end) - |> Enum.sort_by(&(-&1.proc_mem)) + |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end defp take_until_target(proc_groups, target) do From a9dcbaed26426644ecf40abeff4f65acf4570254 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:44:03 +0200 Subject: [PATCH 16/22] Add {:at_least_bytes, n} limit variant for top process queries The new limit includes all process groups whose aggregated memory is at least n bytes. This is useful for binary memory telemetry where a percentage-of-total doesn't make sense due to refc binary double-counting. Also makes take_until_target accept the low cutoff as an argument instead of using the hardcoded @min_group_memory. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../lib/electric/telemetry/opts.ex | 3 ++- .../lib/electric/telemetry/processes.ex | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/opts.ex b/packages/electric-telemetry/lib/electric/telemetry/opts.ex index 189aea5053..84fc18ce3b 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/opts.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/opts.ex @@ -31,7 +31,8 @@ defmodule ElectricTelemetry.Opts do [ {:in, [:mem_percent]}, {:custom, ElectricTelemetry.Processes, :validate_mem_percent, []} - ]} + ]}, + {:tuple, [{:in, [:at_least_bytes]}, :non_neg_integer]} ]}, default: {:count, 5} ], diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 7905533439..2030080953 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -1,5 +1,6 @@ defmodule ElectricTelemetry.Processes do - @type limit :: {:count, pos_integer()} | {:mem_percent, 1..100} + @type limit :: + {:count, pos_integer()} | {:mem_percent, 1..100} | {:at_least_bytes, non_neg_integer()} @default_count 5 @default_limit {:count, @default_count} @@ -42,7 +43,7 @@ defmodule ElectricTelemetry.Processes do |> Enum.take(count) end - # When sortying by binary mem, processes double-count the same refc binary, so it doesn't + # When sorting by binary mem, processes double-count the same refc binary, so it doesn't # make sense to talk about a "percentage of the total" in that case. # Instead, for binary memory telemetry the low cutoff threshold should be provided. defp top_by(:proc_mem, process_list, {:mem_percent, percent}) @@ -54,7 +55,14 @@ defmodule ElectricTelemetry.Processes do process_list |> sorted_groups(:proc_mem) - |> take_until_target(target) + |> take_until_target(target, @min_group_memory) + end + + defp top_by(sort_key, process_list, {:at_least_bytes, low_cutoff}) + when is_integer(low_cutoff) and low_cutoff >= 0 do + process_list + |> sorted_groups(sort_key) + |> take_until_target(:infinity, low_cutoff) end defp sorted_groups(process_list, sort_key) do @@ -70,16 +78,15 @@ defmodule ElectricTelemetry.Processes do |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end - defp take_until_target(proc_groups, target) do + defp take_until_target(proc_groups, target, low_cutoff) do {_running_total, selected_groups} = Enum.reduce_while(proc_groups, {0, []}, fn _proc_group, {running_total, acc} when running_total >= target -> {:halt, {running_total, acc}} - proc_group, {running_total, acc} when proc_group.memory < @min_group_memory -> + proc_group, {running_total, acc} when proc_group.proc_mem < low_cutoff -> # Include this last process group in the result so it's clear to the caller that the - # minimum group memory threshold has been reached earlier than the target total mem - # one. + # low cutoff threshold has been reached earlier than the target total mem one. {:halt, {running_total, [proc_group | acc]}} proc_group, {running_total, acc} -> From 933d22ce48db337e9a8dfdb2eba666b9ab782857 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:46:40 +0200 Subject: [PATCH 17/22] Use sort_key for cutoff comparison and running total in take_until_target When sorting by binary_mem, the low cutoff and accumulator should compare against binary_mem, not proc_mem. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../lib/electric/telemetry/processes.ex | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/processes.ex b/packages/electric-telemetry/lib/electric/telemetry/processes.ex index 2030080953..ed07365dad 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/processes.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/processes.ex @@ -55,14 +55,14 @@ defmodule ElectricTelemetry.Processes do process_list |> sorted_groups(:proc_mem) - |> take_until_target(target, @min_group_memory) + |> take_until_target(:proc_mem, target, @min_group_memory) end defp top_by(sort_key, process_list, {:at_least_bytes, low_cutoff}) when is_integer(low_cutoff) and low_cutoff >= 0 do process_list |> sorted_groups(sort_key) - |> take_until_target(:infinity, low_cutoff) + |> take_until_target(sort_key, :infinity, low_cutoff) end defp sorted_groups(process_list, sort_key) do @@ -78,19 +78,22 @@ defmodule ElectricTelemetry.Processes do |> Enum.sort_by(&(-Map.fetch!(&1, sort_key))) end - defp take_until_target(proc_groups, target, low_cutoff) do + defp take_until_target(proc_groups, sort_key, target, low_cutoff) do {_running_total, selected_groups} = Enum.reduce_while(proc_groups, {0, []}, fn _proc_group, {running_total, acc} when running_total >= target -> {:halt, {running_total, acc}} - proc_group, {running_total, acc} when proc_group.proc_mem < low_cutoff -> - # Include this last process group in the result so it's clear to the caller that the - # low cutoff threshold has been reached earlier than the target total mem one. - {:halt, {running_total, [proc_group | acc]}} - proc_group, {running_total, acc} -> - {:cont, {running_total + proc_group.proc_mem, [proc_group | acc]}} + value = Map.fetch!(proc_group, sort_key) + + if value < low_cutoff do + # Include this last process group in the result so it's clear to the caller that the + # low cutoff threshold has been reached earlier than the target total mem one. + {:halt, {running_total, [proc_group | acc]}} + else + {:cont, {running_total + value, [proc_group | acc]}} + end end) Enum.reverse(selected_groups) From a9b3fb89ba21212113d79178409ceb3ed7e5a9e2 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:48:57 +0200 Subject: [PATCH 18/22] Add tests for top_bin_memory_by_type and {:at_least_bytes, n} limit Co-Authored-By: Claude Opus 4.6 (1M context) --- .../electric/telemetry/processes_test.exs | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index dbd8a6ae8c..b311cec39a 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -194,6 +194,80 @@ defmodule ElectricTelemetry.ProcessesTest do end end + describe "top_bin_memory_by_type/[0, 1, 2]" do + import ElectricTelemetry.Processes, + only: [top_bin_memory_by_type: 0, top_bin_memory_by_type: 1, top_bin_memory_by_type: 2] + + test "defaults to top 5 sorted by binary_mem" do + results = top_bin_memory_by_type() + assert length(results) == 5 + binary_mems = Enum.map(results, & &1.binary_mem) + assert binary_mems == Enum.sort(binary_mems, :desc) + end + + test "sorts by binary_mem, not proc_mem" do + # Spawn a process with large binary memory but small heap + spawn_with_label(:big_binary, fn -> + # Store a large binary ref — this inflates binary_mem + Process.put(:bin, :crypto.strong_rand_bytes(2 * 1024 * 1024)) + end) + + # Spawn a process with large heap but no binary memory + spawn_with_label(:big_heap, fn -> + # Build a large non-binary term to inflate proc_mem + Process.put(:list, Enum.to_list(1..200_000)) + end) + + proc_mem_results = + ElectricTelemetry.Processes.top_memory_by_type({:count, 100}) + + bin_mem_results = top_bin_memory_by_type({:count, 100}) + + proc_mem_order = Enum.map(proc_mem_results, & &1.type) + bin_mem_order = Enum.map(bin_mem_results, & &1.type) + + # The two orderings should differ since the processes have + # inverted proc_mem vs binary_mem rankings + assert proc_mem_order != bin_mem_order + end + + test "at_least_bytes stops at the cutoff" do + spawn_with_label(:bin_large, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(2 * 1024 * 1024)) + end) + + spawn_with_label(:bin_small, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(100)) + end) + + results = top_bin_memory_by_type({:at_least_bytes, 1024 * 1024}) + + # The last entry should be the one that fell below the cutoff + last = List.last(results) + above_cutoff = Enum.drop(results, -1) + + assert Enum.all?(above_cutoff, &(&1.binary_mem >= 1024 * 1024)) + assert last.binary_mem < 1024 * 1024 + end + + test "at_least_bytes with process list" do + pid1 = + spawn_with_label(:with_bin, fn -> + Process.put(:bin, :crypto.strong_rand_bytes(512 * 1024)) + end) + + pid2 = + spawn_with_label(:without_bin, fn -> + :ok + end) + + results = top_bin_memory_by_type([pid1, pid2], {:at_least_bytes, 1024}) + + types = Enum.map(results, & &1.type) + assert :with_bin in types + end + end + defp spawn_with_label(label, fun \\ fn -> nil end) do parent = self() From 22a670fe2e75aa970eebe3cf6ddc5fd60e06a864 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 13 Apr 2026 16:53:22 +0200 Subject: [PATCH 19/22] Move Processes imports to module level and improve sort order test The sort order test now independently verifies each list is sorted by its respective key and asserts the top entries differ. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../electric/telemetry/processes_test.exs | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs index b311cec39a..22462b5b51 100644 --- a/packages/electric-telemetry/test/electric/telemetry/processes_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/processes_test.exs @@ -1,7 +1,7 @@ defmodule ElectricTelemetry.ProcessesTest do use ExUnit.Case, async: true - import ElectricTelemetry.Processes, only: [proc_type: 1] + import ElectricTelemetry.Processes describe "proc_type/1 with binary labels" do test "groups request labels by method and path, stripping query and request id" do @@ -120,8 +120,6 @@ defmodule ElectricTelemetry.ProcessesTest do end describe "top_memory_by_type/[1, 2]" do - import ElectricTelemetry.Processes, only: [top_memory_by_type: 0, top_memory_by_type: 1] - test "handles dead processes" do parent = self() @@ -195,9 +193,6 @@ defmodule ElectricTelemetry.ProcessesTest do end describe "top_bin_memory_by_type/[0, 1, 2]" do - import ElectricTelemetry.Processes, - only: [top_bin_memory_by_type: 0, top_bin_memory_by_type: 1, top_bin_memory_by_type: 2] - test "defaults to top 5 sorted by binary_mem" do results = top_bin_memory_by_type() assert length(results) == 5 @@ -208,27 +203,26 @@ defmodule ElectricTelemetry.ProcessesTest do test "sorts by binary_mem, not proc_mem" do # Spawn a process with large binary memory but small heap spawn_with_label(:big_binary, fn -> - # Store a large binary ref — this inflates binary_mem Process.put(:bin, :crypto.strong_rand_bytes(2 * 1024 * 1024)) end) # Spawn a process with large heap but no binary memory spawn_with_label(:big_heap, fn -> - # Build a large non-binary term to inflate proc_mem Process.put(:list, Enum.to_list(1..200_000)) end) - proc_mem_results = - ElectricTelemetry.Processes.top_memory_by_type({:count, 100}) - + proc_mem_results = top_memory_by_type({:count, 100}) bin_mem_results = top_bin_memory_by_type({:count, 100}) - proc_mem_order = Enum.map(proc_mem_results, & &1.type) - bin_mem_order = Enum.map(bin_mem_results, & &1.type) + # Each list is sorted by its own key + proc_mems = Enum.map(proc_mem_results, & &1.proc_mem) + assert proc_mems == Enum.sort(proc_mems, :desc) + + binary_mems = Enum.map(bin_mem_results, & &1.binary_mem) + assert binary_mems == Enum.sort(binary_mems, :desc) - # The two orderings should differ since the processes have - # inverted proc_mem vs binary_mem rankings - assert proc_mem_order != bin_mem_order + # The top entry in each list should be different + assert hd(proc_mem_results).type != hd(bin_mem_results).type end test "at_least_bytes stops at the cutoff" do From 9db1cc8a6fb5c375d134fbf4169bcc8cd7a2c36f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 14 Apr 2026 12:48:30 +0200 Subject: [PATCH 20/22] Make stack telemetry init delay configurable via ELECTRIC_STACK_TELEMETRY_INIT_DELAY The default remains 30s for production. Integration tests set it to 1s so stack metrics are exported quickly enough for test assertions. Co-Authored-By: Claude Opus 4.6 (1M context) --- integration-tests/tests/otel-export.lux | 2 +- .../electric-telemetry/lib/electric/telemetry/opts.ex | 1 + .../lib/electric/telemetry/stack_telemetry.ex | 2 +- packages/sync-service/config/runtime.exs | 8 ++++++++ packages/sync-service/lib/electric/application.ex | 1 + 5 files changed, 12 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/otel-export.lux b/integration-tests/tests/otel-export.lux index 9c6f6671c3..c3a2cf5928 100644 --- a/integration-tests/tests/otel-export.lux +++ b/integration-tests/tests/otel-export.lux @@ -10,7 +10,7 @@ [invoke setup_pg] -[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] +[invoke setup_electric_with_env "ELECTRIC_OTLP_ENDPOINT=http://localhost:4318 ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL=1s ELECTRIC_STACK_TELEMETRY_INIT_DELAY=1s ELECTRIC_OTEL_EXPORT_PERIOD=2s DO_NOT_START_CONN_MAN_PING=1 ELECTRIC_LOG_LEVEL=info OTEL_RESOURCE_ATTRIBUTES=custom.attr=electric.val"] # Spawn a process containing off-heap binary references and ensure its in the top 5 by memory footprint. # Otel Collector sorts metrics in its debug output, so the process label for our process needs diff --git a/packages/electric-telemetry/lib/electric/telemetry/opts.ex b/packages/electric-telemetry/lib/electric/telemetry/opts.ex index 84fc18ce3b..7805d1d7fb 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/opts.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/opts.ex @@ -22,6 +22,7 @@ defmodule ElectricTelemetry.Opts do default: [], keys: [ system_metrics_poll_interval: [type: :integer, default: :timer.seconds(5)], + stack_telemetry_init_delay: [type: :integer, default: :timer.seconds(30)], top_process_limit: [ type: {:or, diff --git a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex index 3e2723b90b..fc209021ce 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/stack_telemetry.ex @@ -38,7 +38,7 @@ defmodule ElectricTelemetry.StackTelemetry do [ ElectricTelemetry.Poller.child_spec(opts, callback_module: __MODULE__, - init_delay: :timer.seconds(30) + init_delay: opts.intervals_and_thresholds.stack_telemetry_init_delay ) ], disk_usage_child_specs(opts), diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 892b55cafd..053434cfdf 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -177,6 +177,13 @@ otel_export_period = nil ) +stack_telemetry_init_delay = + env!( + "ELECTRIC_STACK_TELEMETRY_INIT_DELAY", + &Electric.Config.parse_human_readable_time!/1, + nil + ) + # The provided database id is relevant if you had used v0.8 and want to keep the storage # instead of having hanging files. We use a provided value as stack id, but nothing else. provided_database_id = env!("ELECTRIC_DATABASE_ID", :string, nil) @@ -214,6 +221,7 @@ config :electric, call_home_telemetry?: env!("ELECTRIC_USAGE_REPORTING", :boolean, config_env() == :prod), telemetry_url: call_home_telemetry_url, system_metrics_poll_interval: system_metrics_poll_interval, + stack_telemetry_init_delay: stack_telemetry_init_delay, otel_export_period: otel_export_period, otel_sampling_ratio: env!("ELECTRIC_OTEL_SAMPLING_RATIO", :float, nil), metrics_sampling_ratio: env!("ELECTRIC_METRICS_SAMPLING_RATIO", :float, nil), diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 2d22006f25..9a613c8893 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -381,6 +381,7 @@ defmodule Electric.Application do intervals_and_thresholds: get_opts(opts, system_metrics_poll_interval: :system_metrics_poll_interval, + stack_telemetry_init_delay: :stack_telemetry_init_delay, long_gc_threshold: :telemetry_long_gc_threshold, long_schedule_threshold: :telemetry_long_schedule_threshold, long_message_queue_enable_threshold: :telemetry_long_message_queue_enable_threshold, From e8c948d37a50ecadf11706e221cde95864710497 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 14 Apr 2026 13:24:56 +0200 Subject: [PATCH 21/22] Remove MIX_OS_DEPS_COMPILE_PARTITION_COUNT from the Dockerfile CI seems to be having trouble with this configuration option --- packages/sync-service/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/Dockerfile b/packages/sync-service/Dockerfile index 3df7f90457..820c64c30d 100644 --- a/packages/sync-service/Dockerfile +++ b/packages/sync-service/Dockerfile @@ -25,7 +25,7 @@ COPY --from=electric-telemetry / /builder/electric-telemetry COPY mix.* /builder/electric/ RUN mix deps.get -RUN MIX_OS_DEPS_COMPILE_PARTITION_COUNT=4 mix deps.compile +RUN mix deps.compile # These are ordered by change frequency, with the least frequently changing dir first. COPY rel /builder/electric/rel From 5b9694e24377af4dcc6a30ba92fb3de4d6f288db Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 14 Apr 2026 15:22:58 +0200 Subject: [PATCH 22/22] Revert "Remove MIX_OS_DEPS_COMPILE_PARTITION_COUNT from the Dockerfile" This reverts commit e8c948d37a50ecadf11706e221cde95864710497. --- packages/sync-service/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/Dockerfile b/packages/sync-service/Dockerfile index 820c64c30d..3df7f90457 100644 --- a/packages/sync-service/Dockerfile +++ b/packages/sync-service/Dockerfile @@ -25,7 +25,7 @@ COPY --from=electric-telemetry / /builder/electric-telemetry COPY mix.* /builder/electric/ RUN mix deps.get -RUN mix deps.compile +RUN MIX_OS_DEPS_COMPILE_PARTITION_COUNT=4 mix deps.compile # These are ordered by change frequency, with the least frequently changing dir first. COPY rel /builder/electric/rel