Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/foundation/mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ void cbm_mem_init(double ram_fraction) {
mi_option_set(mi_option_arena_eager_commit, 0);
mi_option_set(mi_option_purge_decommits, SKIP_ONE);
mi_option_set(mi_option_purge_delay, 0); /* immediate purge, no 1s delay */
/* v3 (#832): reclaim abandoned pages on ANY thread's free (=1), restoring the
* v2 behaviour. mimalloc v3 defaults page_reclaim_on_free=0, so pages a worker
* thread abandons at exit are NOT reclaimed when the main thread later frees
* their blocks (and mi_collect cannot touch abandoned pages) — RSS then
* ratchets across repeated in-process index cycles. The supervised subprocess
* is the primary cure (the child returns 100% RSS on exit); this is the
* in-process fallback for any path that stays in-process (kill switch,
* spawn-fail degrade, embedders). */
mi_option_set(mi_option_page_reclaim_on_free, 1);

/* CBM_MEM_BUDGET_MB env override (memory analogue of CBM_WORKERS).
* Lets users cap the budget directly without an enclosing cgroup —
Expand Down
17 changes: 17 additions & 0 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ static int watcher_index_fn(const char *project_name, const char *root_path, voi

cbm_log_info("watcher.reindex", "project", project_name, "path", root_path);

/* #832: route the re-index through the supervised worker subprocess so this
* long-lived server process hands its RSS back to the OS on every cycle
* instead of ratcheting (mimalloc v3 does not reclaim pages that worker
* threads abandon at exit). The child writes the DB; the parent only needs the
* return code. The pipeline lock (already held) still serialises re-indexes.
* Degrade to the in-process pipeline when the supervisor is off (kill switch)
* or the spawn fails. */
if (cbm_index_supervisor_should_wrap()) {
char *resp = cbm_mcp_index_run_supervised_path(root_path);
if (resp) {
free(resp);
cbm_pipeline_unlock();
return 0;
}
/* resp == NULL → spawn-failure degrade → fall through to in-process. */
}

cbm_pipeline_t *p = cbm_pipeline_new(root_path, NULL, CBM_MODE_FULL);
if (!p) {
cbm_pipeline_unlock();
Expand Down
58 changes: 56 additions & 2 deletions src/mcp/mcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -3481,8 +3481,14 @@ static char *build_worker_failure_response(const char *args, cbm_proc_outcome_t
}

/* Drop the cached store so the next query reopens whatever the worker wrote (each
* worker is a fresh process that deletes + recreates the .db). */
* worker is a fresh process that deletes + recreates the .db). NULL-safe: the
* background watcher path (main.c) has no MCP server / cached store — the child
* writes the DB and the parent only needs the return code, so there is nothing
* to invalidate. */
static void supervisor_invalidate_store(cbm_mcp_server_t *srv) {
if (!srv) {
return;
}
if (srv->owns_store && srv->store) {
cbm_store_close(srv->store);
srv->store = NULL;
Expand Down Expand Up @@ -3687,6 +3693,34 @@ static char *index_run_supervised(cbm_mcp_server_t *srv, const char *args) {
return build_worker_failure_response(args, last_outcome);
}

/* Build a minimal {"repo_path": "<root>"} args object (path safely escaped) and
* run it through index_run_supervised. Shared by the session auto-index (srv
* present → its cached store is invalidated) and the watcher re-index (srv NULL).
* Returns the worker's response string (caller frees) or NULL to degrade. */
static char *index_run_supervised_path(cbm_mcp_server_t *srv, const char *root_path) {
if (!root_path || !root_path[0]) {
return NULL;
}
yyjson_mut_doc *doc = yyjson_mut_doc_new(NULL);
yyjson_mut_val *root = yyjson_mut_obj(doc);
yyjson_mut_doc_set_root(doc, root);
yyjson_mut_obj_add_strcpy(doc, root, "repo_path", root_path);
char *args = yy_doc_to_str(doc);
yyjson_mut_doc_free(doc);
if (!args) {
return NULL;
}
char *resp = index_run_supervised(srv, args);
free(args);
return resp;
}

/* Public entry (see mcp.h): the watcher re-index in main.c has no MCP server, so
* it reaches the supervised runner through this srv-less wrapper. */
char *cbm_mcp_index_run_supervised_path(const char *root_path) {
return index_run_supervised_path(NULL, root_path);
}

static char *handle_index_repository(cbm_mcp_server_t *srv, const char *args) {
/* Supervisor gate: run the index in a crash/hang-isolating worker subprocess
* unless this process IS the worker or the kill switch (CBM_INDEX_SUPERVISOR=0)
Expand Down Expand Up @@ -5662,6 +5696,26 @@ static void *autoindex_thread(void *arg) {

cbm_log_info("autoindex.start", "project", srv->session_project, "path", srv->session_root);

/* #832: prefer the supervised worker subprocess. Indexing the whole session in
* this long-lived server thread ratchets RSS (mimalloc v3 does not reclaim the
* pages worker threads abandon at exit); running it in a child that exits hands
* 100% of that memory back to the OS every cycle. Degrade to the in-process
* pipeline below when the supervisor is off (kill switch) or the spawn fails. */
if (cbm_index_supervisor_should_wrap()) {
char *resp = index_run_supervised_path(srv, srv->session_root);
if (resp) {
free(resp);
cbm_log_info("autoindex.done", "project", srv->session_project, "mode", "supervised");
/* Register with watcher for ongoing change detection — gated on
* auto_watch (#849), same as the in-process branch below. A bare
* `if (srv->watcher)` would register even when the user set
* `config set auto_watch false`, since srv->watcher is always set. */
register_watcher_if_enabled(srv);
return NULL;
}
/* resp == NULL → spawn-failure degrade → fall through to in-process. */
}

cbm_pipeline_t *p = cbm_pipeline_new(srv->session_root, NULL, CBM_MODE_FULL);
if (!p) {
cbm_log_warn("autoindex.err", "msg", "pipeline_create_failed");
Expand All @@ -5674,7 +5728,7 @@ static void *autoindex_thread(void *arg) {
cbm_pipeline_unlock();

cbm_pipeline_free(p);
cbm_mem_collect(); /* return mimalloc pages to OS after indexing */
cbm_mem_collect(); /* return mimalloc pages to OS after indexing (in-process only) */

if (rc == 0) {
cbm_log_info("autoindex.done", "project", srv->session_project);
Expand Down
12 changes: 12 additions & 0 deletions src/mcp/mcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ char *cbm_mcp_server_handle(cbm_mcp_server_t *srv, const char *line);
/* Handle a tools/call request. Returns MCP tool result JSON. */
char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const char *args_json);

/* ── Supervised background index (RSS isolation, #832) ────────── */

/* Run a full index of root_path in a supervised worker SUBPROCESS (the same
* crash/hang-isolating runner used by handle_index_repository), so the child
* returns 100% of its RSS to the OS on exit instead of ratcheting the long-lived
* parent. Builds {"repo_path": root_path} internally. Returns the worker's
* response string (caller frees) on success, or NULL to signal the caller must
* degrade to the in-process path (kill switch set, spawn failure, or the process
* is not a supervisor host). This is the shared entry the watcher re-index
* (main.c) and the session auto-index (mcp.c) route through. */
char *cbm_mcp_index_run_supervised_path(const char *root_path);

/* ── Idle store eviction ──────────────────────────────────────── */

/* Evict the cached project store if idle for more than timeout_s seconds.
Expand Down
135 changes: 135 additions & 0 deletions tests/repro/issue832_rss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
issue832_rss.py -- direct RSS reproduction for #832 (NON-GATING, manual repro tier).

WHY THIS IS NOT A C UNIT TEST
-----------------------------
The RSS ratchet is a property of mimalloc v3's abandoned-page handling
(page_reclaim_on_free=0): pages a worker THREAD abandons at exit are not
reclaimed when the main thread later frees their blocks. That only manifests in
the PROD binary, which links mimalloc as the global allocator (Makefile.cbm:
MI_OVERRIDE=1). The C test-runner and the C repro-runner are built CRT+ASan
(MI_OVERRIDE=0), so mimalloc is inert there and cbm_mem_rss() falls back to
os_rss() -- a C test would be VACUOUS. Hence this drives the real
`build/c/codebase-memory-mcp` server over stdio and samples its RSS from `ps`.

WHAT IT SHOWS
-------------
A long-lived MCP server is driven through K index_repository cycles of the same
fixture. The in-process pipeline (CBM_INDEX_SUPERVISOR=0) is the pre-#832-fix
background-path behaviour: RSS RATCHETS across cycles. The supervised subprocess
path (default) is the fix: each child returns 100% of its RSS on exit, so the
long-lived parent stays ~FLAT. The auto-index (mcp.c) and watcher re-index
(main.c) paths now route through that same supervised subprocess, so they inherit
this flat profile; the deterministic routing proof is the GATING guard
tests/test_mcp.c::index_bg_paths_route_through_supervisor_issue832.

Inherently noisy (allocator/OS dependent) -> thresholds are generous and this is
NOT wired into `make test` / `ci-ok`. Run manually:
make -f Makefile.cbm cbm
python3 tests/repro/issue832_rss.py
"""
import json
import os
import shutil
import subprocess
import sys
import tempfile

ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BINARY = os.path.join(ROOT, "build", "c", "codebase-memory-mcp")
CYCLES = 10
NUM_FILES = 120 # enough files to fan out across worker threads (abandoned heaps)


def rss_kb(pid):
out = subprocess.check_output(["ps", "-o", "rss=", "-p", str(pid)])
return int(out.strip())


def make_fixture(d):
for i in range(NUM_FILES):
with open(os.path.join(d, f"mod_{i}.py"), "w") as f:
for j in range(20):
f.write(f"def fn_{i}_{j}(a, b):\n")
f.write(f" x = a + b + {i} * {j}\n")
f.write(" return x\n\n")


def run_series(repo, cache, supervised):
env = dict(os.environ)
env["CBM_CACHE_DIR"] = cache
if supervised:
env.pop("CBM_INDEX_SUPERVISOR", None)
else:
env["CBM_INDEX_SUPERVISOR"] = "0" # in-process (pre-fix background behaviour)
env["CBM_INDEX_WORKER_TIMEOUT_S"] = "120"

proc = subprocess.Popen(
[BINARY], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, env=env, text=True, bufsize=1,
)

def rpc(obj):
proc.stdin.write(json.dumps(obj) + "\n")
proc.stdin.flush()
return proc.stdout.readline()

rpc({"jsonrpc": "2.0", "id": 0, "method": "initialize", "params": {}})
series = []
for k in range(CYCLES):
rpc({"jsonrpc": "2.0", "id": k + 1, "method": "tools/call",
"params": {"name": "index_repository",
"arguments": {"repo_path": repo, "mode": "fast"}}})
series.append(rss_kb(proc.pid))
try:
proc.stdin.close()
proc.wait(timeout=15)
except Exception:
proc.kill()
return series


def main():
if not os.path.exists(BINARY):
print(f"missing prod binary: {BINARY}\n build it: make -f Makefile.cbm cbm")
return 2
base = tempfile.mkdtemp(prefix="cbm-832-")
repo = os.path.join(base, "repo")
os.makedirs(repo)
make_fixture(repo)
try:
inproc = run_series(repo, os.path.join(base, "c1"), supervised=False)
superv = run_series(repo, os.path.join(base, "c2"), supervised=True)
finally:
shutil.rmtree(base, ignore_errors=True)

def mb(kb):
return kb / 1024.0

print(f"cycles={CYCLES} files={NUM_FILES}")
print("cycle | in-process(MB) | supervised(MB)")
for i in range(CYCLES):
print(f" {i:2d} | {mb(inproc[i]):8.1f} | {mb(superv[i]):8.1f}")

ip_peak = max(mb(x) for x in inproc)
sv_peak = max(mb(x) for x in superv)
print(f"\nin-process peak resident: {ip_peak:8.1f} MB")
print(f"supervised peak resident: {sv_peak:8.1f} MB")
# The decisive, robust signal at laptop-fixture scale is the RESIDENT-LEVEL
# contrast, not cycle-over-cycle growth: the in-process server keeps the whole
# index working set resident (it never leaves the long-lived process), while
# the supervised path returns it every cycle (the child exits) -> the server
# stays near its idle baseline. The unbounded ratchet in the field (#832, GB
# over hours) is the same effect amplified by worker-thread count + cycle count
# beyond what a small fixture surfaces. Generous threshold; report-only,
# NON-GATING.
verdict = "SUPERVISED ISOLATION reproduced (server stays near baseline)" \
if sv_peak < ip_peak / 2 \
else "inconclusive (env-dependent; see numbers)"
print(f"verdict: {verdict}")
return 0


if __name__ == "__main__":
sys.exit(main())
72 changes: 71 additions & 1 deletion tests/test_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,74 @@ int tf_fail_count = 0;
int tf_skip_count = 0;

#include "test_framework.h"
#include "foundation/compat.h" /* cbm_setenv — #845 supervisor kill switch */
#include "foundation/compat.h" /* cbm_setenv — #845 supervisor kill switch */
#include "foundation/compat_fs.h" /* cbm_fopen — worker response file */
#include "foundation/mem.h" /* cbm_mem_init — worker budget */
#include "mcp/index_supervisor.h" /* cbm_index_set_worker_role */
#include "mcp/mcp.h" /* cbm_mcp_handle_tool — act as a real worker */
#include <sqlite3.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* #832 guard support: when the index supervisor spawns THIS binary as
* `<self> cli --index-worker index_repository <args_json> --response-out <file>`
* (exactly the argv cbm_index_spawn_worker builds), act as a faithful in-process
* index worker instead of re-running the test suites. This lets the deterministic
* gating guard (test_mcp.c) spawn a REAL worker child that indexes the fixture and
* writes its response back, using only public APIs — no production test seam.
* Returns an exit code (>=0) when it handled a worker invocation, else -1. */
static int tf_maybe_run_index_worker(int argc, char **argv) {
if (argc < 2 || strcmp(argv[1], "cli") != 0) {
return -1;
}
bool is_worker = false;
const char *tool = NULL;
const char *args_json = "{}";
const char *response_out = NULL;
for (int i = 2; i < argc; i++) {
if (strcmp(argv[i], "--index-worker") == 0) {
is_worker = true;
} else if (strcmp(argv[i], "--response-out") == 0) {
if (i + 1 < argc) {
response_out = argv[++i];
}
} else if (argv[i][0] == '{') {
args_json = argv[i];
} else if (argv[i][0] != '-' && !tool) {
tool = argv[i];
}
}
if (!is_worker) {
return -1;
}
if (!tool) {
tool = "index_repository";
}

cbm_mem_init(0.5);
cbm_index_set_worker_role(true, response_out); /* worker role → index in-process */
cbm_mcp_server_t *srv = cbm_mcp_server_new(NULL);
if (!srv) {
return 1;
}
char *result = cbm_mcp_handle_tool(srv, tool, args_json);
if (result) {
const char *ro = cbm_index_worker_response_out();
if (ro) {
FILE *rf = cbm_fopen(ro, "wb");
if (rf) {
(void)fputs(result, rf);
(void)fclose(rf);
}
}
free(result);
}
cbm_mcp_server_free(srv);
return 0;
}

static int g_suite_argc = 0;
static char **g_suite_argv = NULL;

Expand Down Expand Up @@ -135,6 +198,13 @@ extern void suite_dump_verify_io(void);
extern void cbm_kind_in_set_free_cache(void);

int main(int argc, char **argv) {
/* #832: if spawned as a supervised index worker, do the real work and exit
* before any suite runs (see tf_maybe_run_index_worker). */
int worker_rc = tf_maybe_run_index_worker(argc, argv);
if (worker_rc >= 0) {
return worker_rc;
}

/* #845 belt-and-suspenders: this binary EMBEDS cbm_mcp_handle_tool. The
* supervisor gate already ignores unmarked hosts, but pin the kill switch
* too so even a future supervisor-marked test host can never resolve THIS
Expand Down
Loading
Loading