Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 52 additions & 52 deletions internal/jobs/deploy_lifecycle_coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1984,17 +1984,17 @@ func TestOrphanSweep_Pass5_StackListError(t *testing.T) {
}
}

// TestOrphanSweep_Pass5_StackIDsQueryError exercises the live-stack-id
// TestOrphanSweep_Pass5_StackSlugsQueryError exercises the live-stack-slug
// query DB-blip — must fail-open.
func TestOrphanSweep_Pass5_StackIDsQueryError(t *testing.T) {
func TestOrphanSweep_Pass5_StackSlugsQueryError(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()
mock.ExpectQuery(`SELECT d.app_id, d.status, t.status, d.created_at\s+FROM deployments d\s+JOIN teams t`).
WillReturnRows(sqlmock.NewRows([]string{"app_id", "d_status", "t_status", "created_at"}))
mock.ExpectQuery(`SELECT id::text\s+FROM stacks`).
mock.ExpectQuery(`SELECT slug\s+FROM stacks`).
WillReturnError(errors.New("conn lost"))

lister := newFakeNamespaceLister().withStackNamespaces(ExpireStacksNamespacePrefix + "stack-1")
Expand Down Expand Up @@ -2091,8 +2091,8 @@ func TestOrphanSweep_Pass5_DeleteFails(t *testing.T) {
orphanNS := ExpireStacksNamespacePrefix + "willfail"
mock.ExpectQuery(`SELECT d.app_id, d.status, t.status, d.created_at\s+FROM deployments d\s+JOIN teams t`).
WillReturnRows(sqlmock.NewRows([]string{"app_id", "d_status", "t_status", "created_at"}))
mock.ExpectQuery(`SELECT id::text\s+FROM stacks`).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectQuery(`SELECT slug\s+FROM stacks`).
WillReturnRows(sqlmock.NewRows([]string{"slug"}))

lister := newFakeNamespaceLister().withStackNamespaces(orphanNS)
lister.failOn = map[string]error{orphanNS: errors.New("forbidden")}
Expand Down Expand Up @@ -2314,76 +2314,76 @@ func TestOrphanSweep_FetchLiveResourceTokens_ScanError(t *testing.T) {
}
}

// TestOrphanSweep_FetchLiveStackIDs_ScanError exercises the scan-failure
// TestOrphanSweep_FetchLiveStackSlugs_ScanError exercises the scan-failure
// branch for the stacks query.
func TestOrphanSweep_FetchLiveStackIDs_ScanError(t *testing.T) {
func TestOrphanSweep_FetchLiveStackSlugs_ScanError(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()
mock.ExpectQuery(`SELECT id::text\s+FROM stacks`).
WillReturnRows(sqlmock.NewRows([]string{"id", "extra"}).
AddRow("id", "extra"))
mock.ExpectQuery(`SELECT slug\s+FROM stacks`).
WillReturnRows(sqlmock.NewRows([]string{"slug", "extra"}).
AddRow("slug", "extra"))

w := &OrphanSweepReconciler{db: db}
_, err = w.fetchLiveStackIDs(context.Background())
_, err = w.fetchLiveStackSlugs(context.Background())
if err == nil {
t.Fatal("expected scan error from wrong column count")
}
}

// TestOrphanSweep_FetchLiveStackIDs_KeysetPagination proves the bug-bash
// 2026-06-03 fix: fetchLiveStackIDs no longer issues one unbounded SELECT but
// streams the live stack ids in keyset-paginated batches. The first page is
// FULL (== orphanLiveIDsBatchLimit rows) so the loop must issue a SECOND query
// whose cursor ($1) is the last id of page 1; the second page is short, ending
// the loop. The assertion: every id from BOTH pages lands in the returned set,
// AND the second query's keyset arg equals page 1's tail (proving the cursor
// advanced rather than re-scanning from the start).
func TestOrphanSweep_FetchLiveStackIDs_KeysetPagination(t *testing.T) {
// TestOrphanSweep_FetchLiveStackSlugs_KeysetPagination proves
// fetchLiveStackSlugs streams the live stack slugs in keyset-paginated
// batches. The first page is FULL (== orphanLiveIDsBatchLimit rows) so the
// loop must issue a SECOND query whose cursor ($1) is the last slug of page 1;
// the second page is short, ending the loop. The assertion: every slug from
// BOTH pages lands in the returned set, AND the second query's keyset arg
// equals page 1's tail (proving the cursor advanced rather than re-scanning
// from the start).
func TestOrphanSweep_FetchLiveStackSlugs_KeysetPagination(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()

// Page 1: exactly orphanLiveIDsBatchLimit rows, zero-padded so they sort
// lexicographically in the same order we add them. The last id is the
// lexicographically in the same order we add them. The last slug is the
// keyset cursor the second query must carry.
page1 := sqlmock.NewRows([]string{"id"})
var lastPage1ID string
page1 := sqlmock.NewRows([]string{"slug"})
var lastPage1Slug string
for i := 0; i < orphanLiveIDsBatchLimit; i++ {
id := fmt.Sprintf("stack-%06d", i)
page1.AddRow(id)
lastPage1ID = id
slug := fmt.Sprintf("stk-%06d", i)
page1.AddRow(slug)
lastPage1Slug = slug
}
queryRE := `SELECT id::text\s+FROM stacks\s+WHERE id::text > \$1\s+ORDER BY id::text ASC\s+LIMIT \$2`
queryRE := `SELECT slug\s+FROM stacks\s+WHERE slug > \$1\s+ORDER BY slug ASC\s+LIMIT \$2`
mock.ExpectQuery(queryRE).
WithArgs("", orphanLiveIDsBatchLimit).
WillReturnRows(page1)
// Page 2: short (2 rows < limit) → loop terminates. The cursor MUST be
// page 1's tail id.
// page 1's tail slug.
mock.ExpectQuery(queryRE).
WithArgs(lastPage1ID, orphanLiveIDsBatchLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("stack-overflow-a").
AddRow("stack-overflow-b"))
WithArgs(lastPage1Slug, orphanLiveIDsBatchLimit).
WillReturnRows(sqlmock.NewRows([]string{"slug"}).
AddRow("stk-overflow-a").
AddRow("stk-overflow-b"))

w := &OrphanSweepReconciler{db: db}
got, err := w.fetchLiveStackIDs(context.Background())
got, err := w.fetchLiveStackSlugs(context.Background())
if err != nil {
t.Fatalf("fetchLiveStackIDs: %v", err)
t.Fatalf("fetchLiveStackSlugs: %v", err)
}
wantCount := orphanLiveIDsBatchLimit + 2
if len(got) != wantCount {
t.Fatalf("live id count = %d; want %d (both pages merged)", len(got), wantCount)
t.Fatalf("live slug count = %d; want %d (both pages merged)", len(got), wantCount)
}
if !got[lastPage1ID] {
t.Errorf("page 1 tail id %q missing from set", lastPage1ID)
if !got[lastPage1Slug] {
t.Errorf("page 1 tail slug %q missing from set", lastPage1Slug)
}
if !got["stack-overflow-a"] || !got["stack-overflow-b"] {
t.Errorf("page 2 ids missing from set: %v", got)
if !got["stk-overflow-a"] || !got["stk-overflow-b"] {
t.Errorf("page 2 slugs missing from set: %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
// Unmet expectation here = the second keyset query never fired (the
Expand All @@ -2392,50 +2392,50 @@ func TestOrphanSweep_FetchLiveStackIDs_KeysetPagination(t *testing.T) {
}
}

// TestOrphanSweep_FetchLiveStackIDs_SecondPageError proves a DB error on a
// TestOrphanSweep_FetchLiveStackSlugs_SecondPageError proves a DB error on a
// LATER keyset page (not just the first) propagates out — the loop must not
// silently return a partial set, which for PASS 5 could wrongly mark a live
// stack's namespace as an orphan.
func TestOrphanSweep_FetchLiveStackIDs_SecondPageError(t *testing.T) {
func TestOrphanSweep_FetchLiveStackSlugs_SecondPageError(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()

page1 := sqlmock.NewRows([]string{"id"})
page1 := sqlmock.NewRows([]string{"slug"})
for i := 0; i < orphanLiveIDsBatchLimit; i++ {
page1.AddRow(fmt.Sprintf("stack-%06d", i))
page1.AddRow(fmt.Sprintf("stk-%06d", i))
}
queryRE := `SELECT id::text\s+FROM stacks\s+WHERE id::text > \$1\s+ORDER BY id::text ASC\s+LIMIT \$2`
queryRE := `SELECT slug\s+FROM stacks\s+WHERE slug > \$1\s+ORDER BY slug ASC\s+LIMIT \$2`
mock.ExpectQuery(queryRE).WillReturnRows(page1)
mock.ExpectQuery(queryRE).WillReturnError(errors.New("conn lost mid-sweep"))

w := &OrphanSweepReconciler{db: db}
if _, err := w.fetchLiveStackIDs(context.Background()); err == nil {
if _, err := w.fetchLiveStackSlugs(context.Background()); err == nil {
t.Fatal("expected error from second-page query failure, got nil (partial set must NOT be returned)")
}
}

// TestOrphanSweep_FetchLiveStackIDs_RowsErr proves a row-iteration error
// TestOrphanSweep_FetchLiveStackSlugs_RowsErr proves a row-iteration error
// (rows.Err() non-nil — e.g. the connection drops mid-stream) propagates out
// rather than silently truncating the live-id set. Distinct from a
// rather than silently truncating the live-slug set. Distinct from a
// QueryContext error: this fires AFTER rows start streaming.
func TestOrphanSweep_FetchLiveStackIDs_RowsErr(t *testing.T) {
func TestOrphanSweep_FetchLiveStackSlugs_RowsErr(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()

rows := sqlmock.NewRows([]string{"id"}).
AddRow("stack-aaaa").
rows := sqlmock.NewRows([]string{"slug"}).
AddRow("stk-aaaa").
RowError(0, errors.New("conn reset mid-stream"))
mock.ExpectQuery(`SELECT id::text\s+FROM stacks\s+WHERE id::text > \$1`).
mock.ExpectQuery(`SELECT slug\s+FROM stacks\s+WHERE slug > \$1`).
WillReturnRows(rows)

w := &OrphanSweepReconciler{db: db}
if _, err := w.fetchLiveStackIDs(context.Background()); err == nil {
if _, err := w.fetchLiveStackSlugs(context.Background()); err == nil {
t.Fatal("expected rows.Err() to propagate, got nil")
}
}
Expand Down
108 changes: 60 additions & 48 deletions internal/jobs/orphan_sweep_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,15 @@ const (
// several ticks rather than spamming the k8s API in one burst.
orphanStuckBuildBatchLimit = 25

// orphanLiveIDsBatchLimit caps how many ids fetchLiveStackIDs pulls per
// round-trip. The full live-id set is still materialized into the
// orphanLiveIDsBatchLimit caps how many slugs fetchLiveStackSlugs pulls
// per round-trip. The full live-slug set is still materialized into the
// returned map (PASS 5 needs the complete set to decide orphan-hood),
// but the rows are streamed in keyset-paginated batches rather than one
// unbounded SELECT — bounding the server-side cursor + per-fetch memory
// so a stacks table that grows to tens of thousands of rows cannot
// pin a multi-MB result set in one allocation. Keyset (id > $1 ORDER BY
// id) is restart-safe and index-friendly (PK scan, no OFFSET drift).
// pin a multi-MB result set in one allocation. Keyset (slug > $1 ORDER BY
// slug) is restart-safe and index-friendly (idx_stacks_slug, no OFFSET
// drift).
orphanLiveIDsBatchLimit = 1000
)

Expand Down Expand Up @@ -870,26 +871,29 @@ func (w *OrphanSweepReconciler) fetchLiveResourceTokens(ctx context.Context) (ma

// ── PASS 5 — orphaned k8s stack namespaces (T6 P0-1) ─────────────────────

// sweepOrphanedStackNamespaces lists every instant-stack-<id> namespace and
// sweepOrphanedStackNamespaces lists every instant-stack-<slug> namespace and
// deletes the ones whose backing `stacks` row is gone — the durable fix for
// the T6 P0-1 leak (BugBash 2026-05-20).
//
// THE LEAK THIS CLOSES. The pre-fix ExpireStacksWorker carried nsPrefix
// "instant-apps-" (derived from cfg.KubeNamespaceApps), but real stack
// namespaces are "instant-stack-<id>". The safety guard in
// namespaces are "instant-stack-<slug>". The safety guard in
// deleteK8sNamespace refused every real stack namespace (returning
// nil-success), and ExpireStacksWorker then DELETE'd the `stacks` row,
// leaving the namespace + pods + service + ingress + TLS cert running
// indefinitely with NO DB pointer ever again. PASS 5 is the recurrence
// guard plus the catch-up sweep for pre-fix orphans.
//
// SAFETY. A namespace is deleted ONLY when its <id> has no row in the
// `stacks` table (the row was hard-deleted by the buggy expirer). A
// namespace whose row is still present — even in terminal status — is left
// alone so the per-stack teardown path stays in charge of it.
// SAFETY. A namespace is deleted ONLY when its <slug> has no row in the
// `stacks` table (the row was hard-deleted by the buggy expirer). The token
// after the prefix is the SLUG (namespace = "instant-stack-{slug}", migration
// 004), NOT the UUID id — comparing it against the id set reaped every live
// stack (the P0 this corrects). A namespace whose row is still present — even
// in terminal status — is left alone so the per-stack teardown path stays in
// charge of it.
//
// FAIL-OPEN. Identical to PASS 3/4: a namespace List failure or a DB blip
// on the live-stack-ids query degrades to one WARN and a zero-orphan
// on the live-stack-slugs query degrades to one WARN and a zero-orphan
// result. The pass never returns an error — PASS 1/2/3/4 have already run.
func (w *OrphanSweepReconciler) sweepOrphanedStackNamespaces(ctx context.Context) (deleted, failed int) {
if w.k8s == nil {
Expand All @@ -909,29 +913,31 @@ func (w *OrphanSweepReconciler) sweepOrphanedStackNamespaces(ctx context.Context
return 0, 0
}

liveStackIDs, err := w.fetchLiveStackIDs(ctx)
liveStackSlugs, err := w.fetchLiveStackSlugs(ctx)
if err != nil {
// A DB blip on the live-stack-ids query must NOT cause a delete
// A DB blip on the live-stack-slugs query must NOT cause a delete
// decision off an empty set — that would tear down every stack
// namespace. Skip the pass this sweep.
slog.Warn("jobs.orphan_sweep.pass5_live_stack_ids_failed",
slog.Warn("jobs.orphan_sweep.pass5_live_stack_slugs_failed",
"error", err.Error(),
"detail", "stack-namespace orphan cleanup skipped this sweep; PASS 1/2/3/4 still ran")
return 0, 0
}

for _, ns := range namespaces {
// The id portion is everything after the "instant-stack-" prefix.
// Stack IDs are UUIDs; we don't parse here — the in-Go string
// comparison against the live-ids set is exact.
stackID := ns[len(ExpireStacksNamespacePrefix):]
if stackID == "" {
// The token after the "instant-stack-" prefix is the stack's SLUG,
// NOT its UUID id. The api stack provider builds the namespace as
// "instant-stack-{slug}" (migration 004: `slug TEXT UNIQUE NOT NULL
// -- short ID used in namespace, URLs`; e.g. "stk-4abb338c"). The
// in-Go string comparison against the live-SLUG set is exact.
stackSlug := ns[len(ExpireStacksNamespacePrefix):]
if stackSlug == "" {
continue
}
if liveStackIDs[stackID] {
if liveStackSlugs[stackSlug] {
continue // a row still owns this namespace — leave it
}
// Orphan: no stacks row for this id.
// Orphan: no stacks row for this slug.
if delErr := w.k8s.DeleteNamespace(ctx, ns); delErr != nil {
failed++
metrics.OrphanSweepReapFailedTotal.WithLabelValues(orphanReapReasonStackNoRow).Inc()
Expand All @@ -951,52 +957,58 @@ func (w *OrphanSweepReconciler) sweepOrphanedStackNamespaces(ctx context.Context
return deleted, failed
}

// fetchLiveStackIDs returns the set of stack ids that still have a row in
// the `stacks` table. Note: unlike PASS 4 (resources), we do NOT filter on
// status — even a terminal-status stacks row pins its namespace so the
// per-stack teardown path owns the delete. The pass is a strict "no row at
// all = orphan" sweep.
// fetchLiveStackSlugs returns the set of stack SLUGS that still have a row in
// the `stacks` table. The slug — NOT the UUID id — is the token PASS 5 compares
// against, because the api stack provider builds the namespace as
// "instant-stack-{slug}" (migration 004: `slug TEXT UNIQUE NOT NULL`; the
// table's `id` UUID PK is a separate column never embedded in the namespace).
// Keying the liveness set by `id` here judged EVERY live stack namespace as an
// orphan (slug ∉ UUID set) and reaped it within minutes — the P0 this fixes.
//
// Batching (bug bash 2026-06-03): the previous `SELECT id::text FROM stacks`
// loaded the ENTIRE stacks table into one result set/allocation. This now
// streams the ids in keyset-paginated batches of orphanLiveIDsBatchLimit
// (WHERE id > $1 ORDER BY id LIMIT $2), so the server-side cursor + per-fetch
// memory stay bounded regardless of table size. The complete set is still
// returned — PASS 5 must see every live id to avoid deleting a live
// namespace — but it is assembled incrementally rather than in one shot.
// Note: unlike PASS 4 (resources), we do NOT filter on status — even a
// terminal-status stacks row pins its namespace so the per-stack teardown path
// owns the delete. The pass is a strict "no row at all = orphan" sweep.
//
// Batching: the slugs are streamed in keyset-paginated batches of
// orphanLiveIDsBatchLimit (WHERE slug > $1 ORDER BY slug LIMIT $2), so the
// server-side cursor + per-fetch memory stay bounded regardless of table size.
// The complete set is still returned — PASS 5 must see every live slug to avoid
// deleting a live namespace — but it is assembled incrementally. slug is TEXT
// UNIQUE with index idx_stacks_slug (migration 004), so the keyset scan is
// index-friendly.
//
// Keyset over OFFSET: an OFFSET sweep re-scans skipped rows each page and can
// skip/duplicate ids if rows are inserted/deleted mid-sweep; the (id > last)
// predicate rides the primary-key index and is stable under concurrent writes
// (a brand-new stack id either sorts after the cursor — seen this sweep — or
// skip/duplicate slugs if rows are inserted/deleted mid-sweep; the (slug > last)
// predicate rides the slug index and is stable under concurrent writes (a
// brand-new stack slug either sorts after the cursor — seen this sweep — or
// before it — already seen; either way it lands in the set). Newly-inserted
// stacks during the sweep are the conservative case for PASS 5 anyway: a
// missed live id can only ever PRESERVE a namespace, never wrongly delete one.
func (w *OrphanSweepReconciler) fetchLiveStackIDs(ctx context.Context) (map[string]bool, error) {
// missed live slug can only ever PRESERVE a namespace, never wrongly delete one.
func (w *OrphanSweepReconciler) fetchLiveStackSlugs(ctx context.Context) (map[string]bool, error) {
out := make(map[string]bool)
lastID := "" // keyset cursor: empty string sorts before every real id
lastSlug := "" // keyset cursor: empty string sorts before every real slug
for {
rows, err := w.db.QueryContext(ctx, `
SELECT id::text
SELECT slug
FROM stacks
WHERE id::text > $1
ORDER BY id::text ASC
WHERE slug > $1
ORDER BY slug ASC
LIMIT $2
`, lastID, orphanLiveIDsBatchLimit)
`, lastSlug, orphanLiveIDsBatchLimit)
if err != nil {
return nil, err
}
batchCount := 0
for rows.Next() {
var id string
if scanErr := rows.Scan(&id); scanErr != nil {
var slug string
if scanErr := rows.Scan(&slug); scanErr != nil {
_ = rows.Close()
return nil, scanErr
}
batchCount++
lastID = id
if id != "" {
out[id] = true
lastSlug = slug
if slug != "" {
out[slug] = true
}
}
if rowsErr := rows.Err(); rowsErr != nil {
Expand Down
Loading
Loading