diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 1c3540606662..c5e5ca15645b 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -1446,6 +1446,9 @@ PROMETHEUS_ENABLED = env.bool("PROMETHEUS_ENABLED", False) +if PROMETHEUS_ENABLED: + MIDDLEWARE.append("core.middleware.worker_rss.WorkerRSSMiddleware") + DOCGEN_MODE = env.bool("DOCGEN_MODE", default=False) REQUIRE_AUTHENTICATION_FOR_API_DOCS = env.bool( diff --git a/api/core/middleware/worker_rss.py b/api/core/middleware/worker_rss.py new file mode 100644 index 000000000000..cd883f5df9ec --- /dev/null +++ b/api/core/middleware/worker_rss.py @@ -0,0 +1,16 @@ +from django.http import HttpRequest, HttpResponse + +from metrics.worker_metrics import update_worker_metrics + + +class WorkerRSSMiddleware: + def __init__(self, get_response): # type: ignore[no-untyped-def] + self.get_response = get_response + + def __call__(self, request: HttpRequest) -> HttpResponse: + response = self.get_response(request) + try: + update_worker_metrics() + except Exception: + pass + return response diff --git a/api/metrics/worker_metrics.py b/api/metrics/worker_metrics.py new file mode 100644 index 000000000000..ce6d1e9a705e --- /dev/null +++ b/api/metrics/worker_metrics.py @@ -0,0 +1,86 @@ +import os +from pathlib import Path +from typing import Iterable + +import prometheus_client + +PROC_SELF_STATUS_PATH = Path("/proc/self/status") +MAX_RSS_KB_TO_BYTES = 1024 +MAX_RSS_STATUS_FIELD = "VmHWM" + +flagsmith_worker_rss_bytes = prometheus_client.Gauge( + "flagsmith_worker_rss_bytes", + "Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status.", + ["pid"], + multiprocess_mode="liveall", +) + + +def update_worker_metrics() -> None: + """ + Update the RSS gauge with the current worker process high-water mark. + """ + current_pid = os.getpid() + + rss_value = get_current_process_max_rss_bytes() + if rss_value is not None: + flagsmith_worker_rss_bytes.labels(pid=str(current_pid)).set(rss_value) + + +def clear_worker_metrics() -> None: + """ + Clear the RSS memory usage metric for the current worker process. + This should be called when a worker process is shutting down to prevent stale metrics. + """ + current_pid = os.getpid() + try: + flagsmith_worker_rss_bytes.remove(pid=str(current_pid)) + except (KeyError, ValueError): + pass + + +def get_current_process_max_rss_bytes() -> int | None: + try: + proc_status_lines = PROC_SELF_STATUS_PATH.read_text( + encoding="utf-8" + ).splitlines() + except (FileNotFoundError, OSError, UnicodeDecodeError): + return None + + max_rss_kb = _get_proc_status_memory_kb(proc_status_lines, MAX_RSS_STATUS_FIELD) + if max_rss_kb is None: + return None + + return max_rss_kb * MAX_RSS_KB_TO_BYTES + + +def _get_proc_status_memory_kb( + proc_status_lines: Iterable[str], + field_name: str, +) -> int | None: + for line in proc_status_lines: + name, separator, value = line.strip().partition(":") + if separator and name == field_name: + return _parse_proc_status_memory_kb(value) + + return None + + +def _parse_proc_status_memory_kb(value: str) -> int | None: + parts = value.split() + if len(parts) != 2: + return None + + memory_kb_text, unit = parts + if unit != "kB": + return None + + try: + memory_kb = int(memory_kb_text) + except ValueError: + return None + + if memory_kb < 0: + return None + + return memory_kb diff --git a/api/tests/integration/core/test_integration_core_worker_rss_metric.py b/api/tests/integration/core/test_integration_core_worker_rss_metric.py new file mode 100644 index 000000000000..92f2c06d0817 --- /dev/null +++ b/api/tests/integration/core/test_integration_core_worker_rss_metric.py @@ -0,0 +1,46 @@ +import os + +from django.conf import settings as django_settings +from django.test import Client, override_settings +from prometheus_client import REGISTRY, generate_latest +from pytest_mock import MockerFixture + +from metrics.worker_metrics import clear_worker_metrics + + +@override_settings( + MIDDLEWARE=[ + *django_settings.MIDDLEWARE, + "core.middleware.worker_rss.WorkerRSSMiddleware", + ] +) +def test_worker_rss_metric__request_through_middleware__appears_in_prometheus_output( + client: Client, + mocker: MockerFixture, +) -> None: + # Given - deterministic RSS reading so the test is independent of /proc availability + # on macOS/Windows CI runners. + expected_rss = 12_345_678 + mocker.patch( + "metrics.worker_metrics.get_current_process_max_rss_bytes", + return_value=expected_rss, + ) + + # When - any cheap, known-reachable endpoint trips the middleware after response. + response = client.get("/api/v1/swagger.json", HTTP_ACCEPT="application/json") + + # Then - the response is unaffected by the middleware, and the gauge is exposed + # with a sample for the current worker's PID via the Prometheus exposition format. + assert response.status_code == 200 + output = generate_latest(REGISTRY).decode() + assert "flagsmith_worker_rss_bytes" in output + assert f'pid="{os.getpid()}"' in output + + +def teardown_function(function: object) -> None: + # Prevent labelled-child leakage to other tests in the same xdist worker by removing + # this PID's sample after each test. Uses the existing module API. + try: + clear_worker_metrics() + except Exception: + pass diff --git a/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py b/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py new file mode 100644 index 000000000000..f9b4bb71fd64 --- /dev/null +++ b/api/tests/unit/core/middleware/test_unit_core_middleware_worker_rss.py @@ -0,0 +1,53 @@ +from django.http import HttpResponse + +from core.middleware.worker_rss import WorkerRSSMiddleware + + +def test_worker_rss_middleware__any_request__calls_update_after_response(mocker): # type: ignore[no-untyped-def] + # Given + call_order = [] + + def fake_get_response(request): # type: ignore[no-untyped-def] + call_order.append("handled") + return HttpResponse() + + mocker.patch( + "core.middleware.worker_rss.update_worker_metrics", + side_effect=lambda: call_order.append("updated"), + ) + middleware = WorkerRSSMiddleware(fake_get_response) + + # When + middleware(mocker.MagicMock()) + + # Then — metric must be updated after the request is handled, not before + assert call_order == ["handled", "updated"] + + +def test_worker_rss_middleware__any_request__returns_response_unchanged(mocker): # type: ignore[no-untyped-def] + # Given + expected_response = HttpResponse(status=200) + mocker.patch("core.middleware.worker_rss.update_worker_metrics") + middleware = WorkerRSSMiddleware(lambda _request: expected_response) + + # When + result = middleware(mocker.MagicMock()) + + # Then + assert result is expected_response + + +def test_worker_rss_middleware__update_raises__request_still_completes(mocker): # type: ignore[no-untyped-def] + # Given + expected_response = HttpResponse(status=200) + mocker.patch( + "core.middleware.worker_rss.update_worker_metrics", + side_effect=Exception("metric failure"), + ) + middleware = WorkerRSSMiddleware(lambda _request: expected_response) + + # When + result = middleware(mocker.MagicMock()) + + # Then — exception is swallowed, response still returned + assert result is expected_response diff --git a/api/tests/unit/metrics/test_unit_worker_metrics.py b/api/tests/unit/metrics/test_unit_worker_metrics.py new file mode 100644 index 000000000000..4ae24a02f808 --- /dev/null +++ b/api/tests/unit/metrics/test_unit_worker_metrics.py @@ -0,0 +1,243 @@ +from pathlib import Path + +import pytest + +from metrics import worker_metrics + + +class MockGaugeLabels: + def __init__(self): + self.set_called_with = None + + def set(self, value): + self.set_called_with = value + + +class MockGauge: + def __init__(self): + self.labels_called_with = None + self.remove_called_with = None + self.mock_labels = MockGaugeLabels() + self.should_raise_on_remove = None + + def labels(self, *, pid): + self.labels_called_with = pid + return self.mock_labels + + def remove(self, *, pid): + self.remove_called_with = pid + if self.should_raise_on_remove: + raise self.should_raise_on_remove + + +class UnreadableStatusPath: + def read_text(self, encoding: str) -> str: + raise OSError("status file unavailable") + + +def test_get_current_process_max_rss_bytes__vmhwm_available__returns_bytes( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + max_rss_kb = 123 + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\nVmHWM:\t{max_rss_kb} kB\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result == max_rss_kb * worker_metrics.MAX_RSS_KB_TO_BYTES + + +def test_get_current_process_max_rss_bytes__vmhwm_has_extra_whitespace__returns_bytes( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + max_rss_kb = 456 + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\n VmHWM: {max_rss_kb} kB \nVmRSS:\t10 kB\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result == max_rss_kb * worker_metrics.MAX_RSS_KB_TO_BYTES + + +def test_get_current_process_max_rss_bytes__status_file_missing__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", tmp_path / "missing") + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_get_current_process_max_rss_bytes__vmhwm_missing__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + # Given + status_path = tmp_path / "status" + status_path.write_text("Name:\tgunicorn\nVmRSS:\t10 kB\n", encoding="utf-8") + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +@pytest.mark.parametrize( + "vmhwm_value", + [ + "-1 kB", + "not-a-number kB", + "123 MB", + "123", + "123 kB extra", + ], +) +def test_get_current_process_max_rss_bytes__vmhwm_invalid__returns_none( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + vmhwm_value: str, +) -> None: + # Given + status_path = tmp_path / "status" + status_path.write_text( + f"Name:\tgunicorn\nVmHWM:\t{vmhwm_value}\n", + encoding="utf-8", + ) + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", status_path) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_get_current_process_max_rss_bytes__status_file_read_error__returns_none( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.setattr(worker_metrics, "PROC_SELF_STATUS_PATH", UnreadableStatusPath()) + + # When + result = worker_metrics.get_current_process_max_rss_bytes() + + # Then + assert result is None + + +def test_update_worker_metrics__rss_available__updates_gauge( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_rss = 1048576 # 1 MB + mock_pid = 12345 + mock_gauge = MockGauge() + + monkeypatch.setattr( + worker_metrics, "get_current_process_max_rss_bytes", lambda: mock_rss + ) + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.update_worker_metrics() + + # Then + assert mock_gauge.labels_called_with == str(mock_pid) + assert mock_gauge.mock_labels.set_called_with == mock_rss + + +def test_update_worker_metrics__rss_none__does_not_update_gauge( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 12345 + mock_gauge = MockGauge() + + monkeypatch.setattr( + worker_metrics, "get_current_process_max_rss_bytes", lambda: None + ) + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.update_worker_metrics() + + # Then + assert mock_gauge.labels_called_with is None + + +def test_clear_worker_metrics__label_present__removes_gauge_label( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) + + +def test_clear_worker_metrics__keyerror__silently_handles( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + mock_gauge.should_raise_on_remove = KeyError("Label not found") + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When/Then (should not raise) + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) + + +def test_clear_worker_metrics__valueerror__silently_handles( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + mock_pid = 67890 + mock_gauge = MockGauge() + mock_gauge.should_raise_on_remove = ValueError("Invalid label") + + monkeypatch.setattr(worker_metrics.os, "getpid", lambda: mock_pid) + monkeypatch.setattr(worker_metrics, "flagsmith_worker_rss_bytes", mock_gauge) + + # When/Then (should not raise) + worker_metrics.clear_worker_metrics() + + # Then + assert mock_gauge.remove_called_with == str(mock_pid) diff --git a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md index b931a958595e..96638e6f6579 100644 --- a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md @@ -133,3 +133,14 @@ Labels: - `task_type` - `result` +### `flagsmith_worker_rss_bytes` + +Gauge. + +Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status. + +Labels: + - `pid` + + + diff --git a/docs/docs/deployment-self-hosting/observability/metrics.mdx b/docs/docs/deployment-self-hosting/observability/metrics.mdx index 3becac991b2c..1d02f3b5ad40 100644 --- a/docs/docs/deployment-self-hosting/observability/metrics.mdx +++ b/docs/docs/deployment-self-hosting/observability/metrics.mdx @@ -16,7 +16,12 @@ The metrics provided by Flagsmith are described below. +## Monitoring worker memory + +The `flagsmith_worker_rss_bytes` gauge reports the peak resident-set size for each worker process. See the +[Worker RSS monitoring guide](./worker-rss-monitoring) for PromQL examples, Grafana panels, and interpretation notes. + ## StatsD -The Flagsmith WSGI worker emits per-request access log metrics (request counts, durations, HTTP statuses) to StatsD -when configured. See [StatsD](/deployment-self-hosting/observability/monitoring#statsd) for setup. +The Flagsmith WSGI worker emits per-request access log metrics (request counts, durations, HTTP statuses) to StatsD when +configured. See [StatsD](/deployment-self-hosting/observability/monitoring#statsd) for setup. diff --git a/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md b/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md new file mode 100644 index 000000000000..2bb4a86e8883 --- /dev/null +++ b/docs/docs/deployment-self-hosting/observability/worker-rss-monitoring.md @@ -0,0 +1,129 @@ +--- +title: Worker RSS monitoring +sidebar_position: 15 +description: Track the peak memory of each Flagsmith API worker process with Prometheus and Grafana. +--- + +The `flagsmith_worker_rss_bytes` gauge exposes the peak resident-set size of every API worker process, labelled by +process ID. This is the most reliable signal for detecting workers that grow unboundedly (a leak) versus workers that +grow under load and stabilise. Use this page once you have Prometheus scraping configured — see +[Monitoring](./monitoring) for setup. + +## Overview + +A worker's RSS is the amount of physical memory the operating system currently has mapped for that process. Python-level +profilers tend to miss leaks that live in C extensions, page caches, or the allocator's free lists, so process-level RSS +is often the only reliable signal in production. + +`flagsmith_worker_rss_bytes` reports the **high-water mark** — the peak RSS observed for the worker since it started. +The value is read from the `VmHWM` line of `/proc/self/status`, which the Linux kernel maintains atomically. The metric +is updated once per HTTP request handled by the worker. + +The gauge has a single label, `pid`, identifying the worker process. When Flagsmith is deployed with multiple gunicorn +workers, you will see one time series per worker. + +## Enabling + +Set the environment variable: + +```bash +PROMETHEUS_ENABLED=true +``` + +This activates the `WorkerRSSMiddleware` that updates the gauge after each request. No further configuration is required +for single-process deployments. + +### Multi-worker deployments + +To aggregate metrics across gunicorn workers, set `PROMETHEUS_MULTIPROC_DIR` to a writable directory: + +```bash +PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus +``` + +The official Flagsmith Docker image sets this automatically. For bare-metal or custom-container deployments, configure +it yourself; otherwise the `/metrics` endpoint will only report data from whichever worker happened to handle the scrape +request. + +## Sample output + +Scraping `/metrics` on a Flagsmith API with two workers running yields output similar to: + +```text +# HELP flagsmith_worker_rss_bytes Maximum RSS (high-water mark) of the worker process in bytes, read from VmHWM in /proc/self/status. +# TYPE flagsmith_worker_rss_bytes gauge +flagsmith_worker_rss_bytes{pid="1234"} 4.8259072e+07 +flagsmith_worker_rss_bytes{pid="1235"} 5.2215808e+07 +``` + +Each `pid` corresponds to a live worker. Values are in bytes; the example above shows roughly 46 MiB and 50 MiB +respectively. + +## PromQL examples + +Useful queries to drop into dashboards or alerts. + +**Per-worker peak RSS (raw):** + +```promql +flagsmith_worker_rss_bytes +``` + +**Maximum peak across all workers:** + +```promql +max(flagsmith_worker_rss_bytes) +``` + +**Peak per worker over the last hour:** + +```promql +max_over_time(flagsmith_worker_rss_bytes[1h]) +``` + +**Growth indicator — peak RSS now minus peak RSS one hour ago:** + +```promql +flagsmith_worker_rss_bytes - flagsmith_worker_rss_bytes offset 1h +``` + +A consistently positive value across many workers and time windows points to a leak. A value that spikes once after a +deployment and then stays flat is normal — the workers grew under load and levelled off. + +## Grafana panel + +A reasonable starting point for a "Worker memory" panel: + +| Setting | Value | +| ------------- | -------------------------------------------- | +| Visualisation | Time series | +| Query | `flagsmith_worker_rss_bytes` | +| Legend | `{{pid}}` | +| Unit | bytes (IEC) — Grafana renders as KiB/MiB/GiB | +| Stacking | Disabled — each worker is independent | + +Add a second panel showing `max(flagsmith_worker_rss_bytes)` for a single-number overview. + +## Interpretation notes + +The metric is a high-water mark, not a current reading. Understanding the implications avoids false alerts. + +- **The value never decreases for a given PID.** Once a worker has peaked at a particular RSS, the gauge for that PID + will stay at that value until the worker process exits. Recovery is observed through PID rotation: when a worker is + recycled (for example, by gunicorn's `--max-requests` setting or by a deployment), the old PID's time series goes + stale and a new PID appears with a fresh, lower value. +- **Steady high RSS is normal after warm-up.** A worker that loads caches at startup will reach its steady-state peak + quickly and stay there. This appears as a flat line in Grafana, not a leak. +- **Periodic large workloads inflate the peak.** If a worker occasionally processes a large payload (for example, a bulk + export), the gauge will pin at that peak for the rest of the worker's lifetime even after the memory has been freed. + Investigate via PID rotation rather than waiting for the value to fall. +- **Leak signature.** A genuine leak shows up as the peak climbing across many worker restarts — every newly forked + worker reaches a higher peak than its predecessor. +- **Quirk: parent-process inheritance.** On Linux, the kernel may preserve the high-water mark across `execve()`, so a + freshly spawned worker can report a non-zero baseline inherited from its parent. Treat the first scrape after a + deployment as informational rather than a true zero. + +## Related documentation + +- [Metrics reference](./metrics) — full catalogue of exported Prometheus metrics. +- [Monitoring](./monitoring) — enabling `/metrics` and other vendor integrations.