From b3c8e91db1ed6d39265450e62010ad67a177d730 Mon Sep 17 00:00:00 2001 From: Manas Srivastava Date: Wed, 10 Jun 2026 09:27:53 +0530 Subject: [PATCH] =?UTF-8?q?fix(worker):=20R1=20=E2=80=94=20drive=20custome?= =?UTF-8?q?r-backup=20cadence=20off=20plans.yaml=20rpo=5Fminutes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The customer-Postgres backup scheduler chose hourly-vs-daily cadence from a hardcoded `switch canonicalTier()` plus a hardcoded SQL tier allow-list. The hourly tiers (pro/growth/team) happened to match their advertised `rpo_minutes:60`, but the coupling was implicit: changing rpo_minutes in plans.yaml would NOT have moved the cadence, so the product could silently over-promise RPO (effective ~24h vs sold 60m). Make the cadence RPO-driven from plans.yaml (the same field surfaced on /api/v1/capabilities), so the cadence that MAKES the RPO true is read from the field that PROMISES it — they can no longer drift: rpo_minutes in [1,60] → hourly (pro/growth/team) rpo_minutes > 60 → once-daily team slot (hobby/hobby_plus = 1440) rpo_minutes == 0 OR backup_retention_days == 0 → never (anonymous/free) - BackupPlanRegistry grows RPOMinutes(tier); the common-plans adapter and the test fake implement it. Registry.RPOMinutes already existed in common/plans. - Scheduler takes the registry (wired from the same `backupPlans` the runner uses); nil registry falls back to the legacy hardcoded mapping so a misconfigured boot never silently stops paid backups. - Candidate SELECT now excludes only the never-backup tiers (anonymous/free) instead of an explicit paid-tier allow-list, so a NEW paid tier in plans.yaml is covered automatically — removing the single-site-list failure mode (root rule 18) that once dropped hobby_plus + every _yearly variant. - Per-row retention==0 guard is a second, independent veto (defence-in-depth) so a stray rpo_minutes on a zero-retention tier still never enqueues. - Idempotency unchanged: the atomic INSERT … WHERE NOT EXISTS (50-min DB lookback) still prevents a duplicate enqueue within the hour even across River RunOnStart double-ticks (dedupe lives in the DB, not River UniqueOpts). Tests prove: a pro resource becomes due every hour (all 24 hours), a free resource is never enqueued (SQL-filtered AND registry-gated if it leaks), no duplicate enqueue within an hour, the [1,60]→hourly / >60→daily boundary, the zero-retention veto, the nil-registry legacy fallback, and the adapter's RPOMinutes delegation against the real embedded plans.yaml. jobs package at 96.0% coverage; all new/changed functions at 100%. Co-Authored-By: Claude Opus 4.8 --- internal/jobs/backup_extra_test.go | 39 +- internal/jobs/backup_s3.go | 25 +- internal/jobs/coverage_gaps_test.go | 4 +- internal/jobs/customer_backup_runner_test.go | 19 + internal/jobs/customer_backup_scheduler.go | 194 ++++++--- .../jobs/customer_backup_scheduler_test.go | 396 ++++++++++++++++-- internal/jobs/workers.go | 6 +- 7 files changed, 574 insertions(+), 109 deletions(-) diff --git a/internal/jobs/backup_extra_test.go b/internal/jobs/backup_extra_test.go index 6952406..2a7dbff 100644 --- a/internal/jobs/backup_extra_test.go +++ b/internal/jobs/backup_extra_test.go @@ -121,6 +121,22 @@ func TestCommonPlanRegistryAdapter_Delegates(t *testing.T) { if d := adapter.BackupRetentionDays("pro"); d <= 0 { t.Errorf("BackupRetentionDays(pro) = %d; want > 0", d) } + // RPOMinutes delegation — the scheduler cadence gate reads this. Pin the + // contract against the real embedded plans.yaml: pro/growth/team promise + // a 60-minute RPO (→ hourly cadence) while anonymous promises 0 (→ never + // backed up). A plans.yaml edit that breaks either trips here. + for _, tier := range []string{"pro", "growth", "team"} { + if m := adapter.RPOMinutes(tier); m != 60 { + t.Errorf("RPOMinutes(%q) = %d; want 60 (hourly-cadence promise)", tier, m) + } + } + if m := adapter.RPOMinutes("anonymous"); m != 0 { + t.Errorf("RPOMinutes(anonymous) = %d; want 0 (never backed up)", m) + } + // hobby promises a coarser daily RPO (1440) → daily cadence. + if m := adapter.RPOMinutes("hobby"); m <= 60 { + t.Errorf("RPOMinutes(hobby) = %d; want > 60 (daily cadence)", m) + } names := adapter.TierNames() if len(names) == 0 { t.Fatal("TierNames returned empty slice") @@ -719,8 +735,8 @@ func TestRunner_ProcessBackup_BadAESKey(t *testing.T) { w := &CustomerBackupRunnerWorker{ db: db, store: newFakeBackupStore(), pgDump: &fakePgDump{}, bucket: "b", prefix: "p", - aesKey: "not-hex-not-valid-please-fail", - now: time.Now, timeout: time.Minute, batchN: backupBatchSize, + aesKey: "not-hex-not-valid-please-fail", + now: time.Now, timeout: time.Minute, batchN: backupBatchSize, } if err := w.Work(context.Background(), fakeRunnerJob()); err != nil { t.Fatalf("Work: %v", err) @@ -922,12 +938,11 @@ func TestCustomerBackupSchedulerArgs_Kind(t *testing.T) { } } -// TestScheduler_AnonymousTier_DoesNotInsert — defensive: an anonymous row -// in resource.tier slips the SQL filter (it shouldn't, but the cadence -// switch also gates it). canonicalTier returns "anonymous"; the switch -// has no case for it, so the row proceeds to the dedupe INSERT — which -// is fine because the SQL filter excludes anonymous-tier rows in the -// first place. This test pins that contract via the SELECT shape. +// TestScheduler_AnonymousTier_DoesNotInsert — defensive: anonymous rows are +// excluded by the SQL WHERE clause (`tier NOT IN ('anonymous','free')`), so +// the candidate set is empty and no INSERT fires. Even if one leaked through, +// the registry cadence gate (rpo_minutes:0 → cadenceNever) skips it. This +// test pins the SQL-exclusion contract via the empty SELECT. func TestScheduler_AnonymousTier_DoesNotInsert(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) if err != nil { @@ -939,7 +954,7 @@ func TestScheduler_AnonymousTier_DoesNotInsert(t *testing.T) { mock.ExpectQuery(`SELECT r\.id::text`). WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"})) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 0, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { t.Fatalf("Work: %v", err) @@ -961,7 +976,7 @@ func TestScheduler_HobbyMissingTeamID_Skips(t *testing.T) { AddRow(resID, "hobby", nil)) // No INSERT expected. - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 0, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { t.Fatalf("Work: %v", err) @@ -987,7 +1002,7 @@ func TestScheduler_InsertError_LoggedNonFatal(t *testing.T) { WithArgs(uuid.MustParse(resID), "pro"). WillReturnError(errors.New("db hiccup")) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { t.Errorf("Work: per-row insert error must be non-fatal: %v", err) @@ -1010,7 +1025,7 @@ func TestScheduler_BadUUIDInRow_Skipped(t *testing.T) { AddRow("not-a-uuid", "pro", teamID)) // No INSERT expected — bad UUID short-circuits the per-row body. - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { t.Fatalf("Work: %v", err) diff --git a/internal/jobs/backup_s3.go b/internal/jobs/backup_s3.go index 59093b9..bf9907f 100644 --- a/internal/jobs/backup_s3.go +++ b/internal/jobs/backup_s3.go @@ -4,10 +4,10 @@ // We keep this as a tiny interface (Upload / Download / Delete / List) instead // of passing *minio.Client around directly so: // -// 1. The runner / restore-runner / retention sweep tests can use a fake that -// exercises the exact streaming path without dialing a real S3. -// 2. A future cutover from MinIO to a real DO-Spaces SDK (or AWS SDK v2) is a -// one-file change — every consumer of this interface stays the same. +// 1. The runner / restore-runner / retention sweep tests can use a fake that +// exercises the exact streaming path without dialing a real S3. +// 2. A future cutover from MinIO to a real DO-Spaces SDK (or AWS SDK v2) is a +// one-file change — every consumer of this interface stays the same. // // All four methods take the bucket as an explicit argument so the same client // can serve a separate retention sweep on a different bucket later (e.g. @@ -44,6 +44,16 @@ type BackupPlanRegistry interface { // tier — e.g. a Pro→Free downgrade — cannot stick around past // policy). BackupRetentionDays(tier string) int + // RPOMinutes returns the per-tier Recovery Point Objective in minutes + // from plans.yaml. This is the SOURCE OF TRUTH the backup SCHEDULER + // uses to pick a cadence: a tier promising rpo_minutes<=60 must be + // backed up hourly (else the effective RPO is ~24h and the product + // over-promises), a tier with rpo_minutes>60 gets the once-daily slot, + // and rpo_minutes==0 ("not promised" — anonymous/free) is never + // enqueued. Keeping the cadence derived from this value (rather than a + // hardcoded tier list) means changing rpo_minutes in plans.yaml + // automatically moves the cadence, with no scheduler code change. + RPOMinutes(tier string) int // TierNames lists the tier names the sweep should iterate. We sweep // per-tier because the SQL hits a partial index on tier_at_backup; // iterating an explicit list (rather than scanning DISTINCT) keeps @@ -80,6 +90,13 @@ func (a *commonPlanRegistryAdapter) BackupRetentionDays(tier string) int { return a.reg.BackupRetentionDays(tier) } +// RPOMinutes delegates to the common Registry. Returns 0 for unknown tiers +// (common's Get falls back to "anonymous", whose RPO is 0 / no scheduled +// backups). The scheduler reads this to choose hourly vs daily cadence. +func (a *commonPlanRegistryAdapter) RPOMinutes(tier string) int { + return a.reg.RPOMinutes(tier) +} + // TierNames returns every tier name registered in plans.yaml. Sorted // is unnecessary — the sweep is order-independent — but stable across // process lifetime so log lines per tier read predictably. diff --git a/internal/jobs/coverage_gaps_test.go b/internal/jobs/coverage_gaps_test.go index 78a974f..339857b 100644 --- a/internal/jobs/coverage_gaps_test.go +++ b/internal/jobs/coverage_gaps_test.go @@ -728,7 +728,7 @@ func TestScheduler_Work_ScanError_SkipsRow(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). AddRow("fffffff0-1111-2222-3333-444444444444", "pro", "not-a-uuid")) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { t.Fatalf("Work should skip unscannable row, got %v", err) @@ -746,7 +746,7 @@ func TestScheduler_Work_RowsError_ReturnsError(t *testing.T) { RowError(0, errors.New("rows boom")) mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`).WillReturnRows(rows) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err == nil || !strings.Contains(err.Error(), "rows error") { diff --git a/internal/jobs/customer_backup_runner_test.go b/internal/jobs/customer_backup_runner_test.go index 71e6263..88a3731 100644 --- a/internal/jobs/customer_backup_runner_test.go +++ b/internal/jobs/customer_backup_runner_test.go @@ -450,6 +450,7 @@ func TestBackupObjectKey(t *testing.T) { // flips a tier, the test fails on the assertion, not on a moved goalpost. type fakeBackupPlanRegistry struct { days map[string]int + rpo map[string]int // tier→rpo_minutes for the scheduler cadence gate tiers []string } @@ -460,6 +461,24 @@ func (f *fakeBackupPlanRegistry) BackupRetentionDays(tier string) int { return 0 } +// RPOMinutes returns the per-tier RPO the scheduler uses to pick cadence. +// When the test didn't declare an rpo map (runner tests don't care), fall +// back to deriving a sane value from retention days so those fakes keep +// satisfying the interface: any tier that takes backups (days>0) reports a +// 60-minute RPO (hourly), tiers with 0 retention report 0 (never). +func (f *fakeBackupPlanRegistry) RPOMinutes(tier string) int { + if f.rpo != nil { + if m, ok := f.rpo[tier]; ok { + return m + } + return 0 + } + if f.BackupRetentionDays(tier) > 0 { + return 60 + } + return 0 +} + func (f *fakeBackupPlanRegistry) TierNames() []string { return f.tiers } // TestRetentionDaysFromRegistry — per-tier retention read straight from diff --git a/internal/jobs/customer_backup_scheduler.go b/internal/jobs/customer_backup_scheduler.go index 92ae94e..923e006 100644 --- a/internal/jobs/customer_backup_scheduler.go +++ b/internal/jobs/customer_backup_scheduler.go @@ -10,17 +10,36 @@ // regardless of whether the row was inserted by this job or by the api // (manual backup) — there's exactly one downstream code path. // -// Cadence by tier (resource.tier, NOT team.plan_tier, so a row that was -// snapshotted at the higher tier keeps its cadence even after a downgrade): +// Cadence is RPO-DRIVEN, not a hardcoded tier list. The decision reads the +// per-tier `rpo_minutes` promise from plans.yaml (via the BackupPlanRegistry) +// so the effective backup cadence always matches what the product SELLS: // -// team / pro / growth → every hour -// hobby → once per day, at the team's daily slot -// anonymous → never (24h TTL, the bundle assumes you re-claim) +// rpo_minutes in [1,60] → every hour (pro / growth / team = 60) +// rpo_minutes > 60 → once per day, at the team's daily slot +// (hobby / hobby_plus = 1440) +// rpo_minutes == 0 → never enqueued (anonymous / free have +// backup_retention_days:0 and promise no RPO; +// 24h TTL, the bundle assumes you re-claim) // -// Hobby's "daily slot" is the lowest 4 bits of the team UUID mod 24, applied -// as the hour-of-day in UTC. This spreads daily backups across the 24 hours -// of the day deterministically per team — so a 2K-team customer base gets a -// flat ~83 backups/hour instead of a 2K-backup spike at midnight UTC. +// R1 (2026-06-10): before this change pro/growth/team were backed up hourly +// but the cadence was wired to a hardcoded `switch canonicalTier()`. Changing +// rpo_minutes in plans.yaml would NOT have moved the cadence — an +// over-promise waiting to happen. Driving the gate off Registry.RPOMinutes +// closes that drift: plans.yaml is now the single source of truth for both +// the advertised RPO AND the cadence that makes it true. When the registry +// is nil (boot misconfigured) we fall back to the legacy hardcoded mapping +// so a bad boot never silently stops every paid backup. +// +// The hourlyRPOCutoffMinutes constant is the line between hourly and daily. +// 60 is not arbitrary: a tier promising rpo_minutes<=60 needs a fresh backup +// at least once an hour or the worst-case data-loss window exceeds the +// promise. Tiers promising a coarser RPO (1440 = daily) are served by the +// once-per-day slot below. +// +// The "daily slot" is the first byte of the team UUID mod 24, applied as the +// hour-of-day in UTC. This spreads daily backups across the 24 hours of the +// day deterministically per team — so a 2K-team customer base gets a flat +// ~83 backups/hour instead of a 2K-backup spike at midnight UTC. // // Dedupe is enforced by a 50-minute lookback per resource: the same hour-bucket // won't double-insert if the scheduler runs twice (e.g. on a worker restart @@ -52,15 +71,27 @@ func (CustomerBackupSchedulerArgs) Kind() string { return "customer_backup_sched // postgres/vector resource whose tier is due for a backup this hour. type CustomerBackupSchedulerWorker struct { river.WorkerDefaults[CustomerBackupSchedulerArgs] - db *sql.DB - now func() time.Time // injectable for tests + db *sql.DB + plans BackupPlanRegistry // RPO-cadence source of truth; nil → legacy fallback + now func() time.Time // injectable for tests } +// hourlyRPOCutoffMinutes is the boundary between hourly and daily cadence. +// A tier whose plans.yaml rpo_minutes is in [1, hourlyRPOCutoffMinutes] is +// backed up every hour; a coarser RPO (> cutoff) gets the once-daily slot. +// 60 matches the pro/growth/team rpo_minutes:60 promise — a fresh backup at +// least hourly is required to honour a 60-minute RPO. +const hourlyRPOCutoffMinutes = 60 + // NewCustomerBackupSchedulerWorker constructs a CustomerBackupSchedulerWorker. -// now is injected so the daily-slot logic can be exercised at fixed times in -// tests; production callers pass nil and we default to time.Now. -func NewCustomerBackupSchedulerWorker(db *sql.DB) *CustomerBackupSchedulerWorker { - return &CustomerBackupSchedulerWorker{db: db, now: time.Now} +// plans is the source of truth for per-tier RPO → cadence; pass the same +// BackupPlanRegistry wired into the runner. When plans is nil the scheduler +// falls back to the legacy hardcoded tier→cadence mapping so a misconfigured +// boot doesn't silently stop every paid backup. now is injected so the +// daily-slot logic can be exercised at fixed times in tests; production +// callers pass time.Now via the constructor default. +func NewCustomerBackupSchedulerWorker(db *sql.DB, plans BackupPlanRegistry) *CustomerBackupSchedulerWorker { + return &CustomerBackupSchedulerWorker{db: db, plans: plans, now: time.Now} } // canonicalTier strips the "_yearly" suffix from a plan tier name so the @@ -88,6 +119,65 @@ func hobbyDailySlot(teamID uuid.UUID) int { return int(teamID[0]) % 24 } +// cadence enumerates how often a tier is backed up. Derived from the tier's +// plans.yaml rpo_minutes promise — see cadenceForTier. +type cadence int + +const ( + cadenceNever cadence = iota // rpo_minutes==0 (anonymous/free) — skip entirely + cadenceHourly // rpo_minutes in [1,60] — every tick + cadenceDaily // rpo_minutes>60 — once per day at the team slot +) + +// cadenceForTier maps a resource tier to its backup cadence using the +// per-tier rpo_minutes from plans.yaml — the same value the product +// advertises on /api/v1/capabilities. This is the keystone of R1: the +// cadence that MAKES the RPO true is read from the SAME field that PROMISES +// it, so the two can never drift. +// +// When plans is nil (boot misconfigured) we fall back to the historical +// hardcoded mapping (pro/growth/team hourly, hobby/hobby_plus daily, all +// _yearly variants follow their canonical tier, everything else never) so a +// bad boot degrades to the known-good behaviour rather than silently +// stopping every paid backup. +func cadenceForTier(plans BackupPlanRegistry, tier string) cadence { + if plans == nil { + return legacyCadenceForTier(tier) + } + // rpo_minutes is the source of truth. 0 = "not promised" → no scheduled + // backups (anonymous/free). retention 0 is a second, independent guard: + // a tier that keeps backups 0 days must never have one enqueued even if + // some future plans.yaml edit left a stray rpo_minutes on it. + if plans.BackupRetentionDays(tier) <= 0 { + return cadenceNever + } + rpo := plans.RPOMinutes(tier) + switch { + case rpo <= 0: + return cadenceNever + case rpo <= hourlyRPOCutoffMinutes: + return cadenceHourly + default: + return cadenceDaily + } +} + +// legacyCadenceForTier is the pre-R1 hardcoded mapping, retained ONLY as the +// nil-registry fallback. New tiers added to plans.yaml are NOT reflected +// here — the registry path above is authoritative. Mirrors the old +// `switch canonicalTier()` gate so a registry-less boot behaves identically +// to the prior release. +func legacyCadenceForTier(tier string) cadence { + switch canonicalTier(tier) { + case "hobby", "hobby_plus": + return cadenceDaily + case "pro", "growth", "team": + return cadenceHourly + default: + return cadenceNever + } +} + // Work performs the per-tick sweep. Every step is fail-open at the // per-resource granularity — a single bad row never blocks the rest of the // sweep, matching the convention from expire.go / quota.go. @@ -103,27 +193,26 @@ func (w *CustomerBackupSchedulerWorker) Work(ctx context.Context, job *river.Job // carries the user-paid retention contract (mirrors ElevateResourceTiers // on the api side). // - // FIX-H (#56/#R6 B36) — the prior hardcoded set - // (hobby, pro, growth, team) silently excluded hobby_plus and every - // _yearly variant, so paid hobby_plus / hobby_plus_yearly / pro_yearly / - // growth_yearly / team_yearly customers received ZERO scheduled - // backups. The fix lists every tier whose plans.yaml row has - // backup_retention_days > 0. We keep the list inline rather than - // querying plans.Registry here because the scheduler doesn't yet - // take a Registry — adding a registry param would force a constructor - // change across cmd/, deferred to a separate refactor. + // R1 (2026-06-10) — the candidate SELECT now only excludes the + // never-backup tiers (anonymous/free, backup_retention_days:0) instead + // of an explicit allow-list of paid tiers. The authoritative cadence + // decision (hourly vs daily vs skip) is made per-row by cadenceForTier, + // which reads rpo_minutes from plans.yaml — so a NEW paid tier added to + // plans.yaml is picked up automatically (no SQL edit), and the per-row + // registry gate is a second, independent skip for anything that leaks + // through. The SQL exclusion is cheap defence-in-depth: it keeps the + // candidate set small without re-deriving the registry in SQL. + // + // Pre-R1 (FIX-H #56/#R6) this was a hardcoded allow-list that silently + // dropped hobby_plus + every _yearly variant when first written; the + // registry-driven path removes that single-site-list failure mode + // entirely (root CLAUDE.md rule 18). rows, err := w.db.QueryContext(ctx, ` SELECT r.id::text, r.tier, r.team_id FROM resources r WHERE r.status = 'active' AND r.resource_type IN ('postgres', 'vector') - AND r.tier IN ( - 'hobby', 'hobby_yearly', - 'hobby_plus', 'hobby_plus_yearly', - 'pro', 'pro_yearly', - 'growth', 'growth_yearly', - 'team', 'team_yearly' - ) + AND r.tier NOT IN ('anonymous', 'free') `) if err != nil { return fmt.Errorf("CustomerBackupSchedulerWorker: query failed: %w", err) @@ -149,30 +238,35 @@ func (w *CustomerBackupSchedulerWorker) Work(ctx context.Context, job *river.Job } inserted := 0 - skippedHobbyOffSlot := 0 + skippedNeverTier := 0 + skippedDailyOffSlot := 0 skippedDedup := 0 for _, c := range candidates { - // Cadence gate. - // - // FIX-H — hobby and hobby_plus (and their _yearly variants) run - // at one daily slot per team. Pro / Growth / Team (and yearly - // counterparts) back up every hour. canonicalTier strips the - // _yearly suffix so hobby_yearly / hobby_plus_yearly share the - // daily-slot policy with their monthly canonical tier. - switch canonicalTier(c.tier) { - case "hobby", "hobby_plus": + // Cadence gate — RPO-driven (see cadenceForTier). The tier's + // plans.yaml rpo_minutes decides hourly vs daily vs skip, so the + // cadence always matches the advertised RPO. + switch cadenceForTier(w.plans, c.tier) { + case cadenceNever: + // rpo_minutes==0 / backup_retention_days==0 — the tier promises + // no RPO (anonymous/free, or anything that slipped past the SQL + // filter). Never enqueue. + skippedNeverTier++ + continue + case cadenceDaily: if !c.teamID.Valid { - // Defensive: a tier='hobby*' resource without a team_id is - // nonsensical (only anonymous rows have NULL team) but if - // it slips in we skip rather than panic-divide. + // Defensive: a daily-cadence (hobby*) resource without a + // team_id is nonsensical (only anonymous rows have NULL + // team) but if it slips in we skip rather than panic-divide. + skippedNeverTier++ continue } if hobbyDailySlot(c.teamID.UUID) != hourUTC { - skippedHobbyOffSlot++ + skippedDailyOffSlot++ continue } + case cadenceHourly: + // Every tick — fall through to insert. } - // pro / growth / team (any variant) + hobby/hobby_plus-on-slot proceed. rid, err := uuid.Parse(c.id) if err != nil { @@ -229,11 +323,12 @@ func (w *CustomerBackupSchedulerWorker) Work(ctx context.Context, job *river.Job // DEBUG. customer_backup_scheduler runs every 1h; an idle tick (no // candidates AND nothing inserted/skipped) is heartbeat noise. INFO // retained for any state-transitioning tick. - if inserted == 0 && skippedHobbyOffSlot == 0 && skippedDedup == 0 && len(candidates) == 0 { + if inserted == 0 && skippedNeverTier == 0 && skippedDailyOffSlot == 0 && skippedDedup == 0 && len(candidates) == 0 { slog.Debug("jobs.customer_backup_scheduler.completed", "candidates", 0, "inserted", 0, - "skipped_hobby_off_slot", 0, + "skipped_never_tier", 0, + "skipped_daily_off_slot", 0, "skipped_dedup", 0, "hour_utc", hourUTC, "duration_ms", time.Since(start).Milliseconds(), @@ -244,7 +339,8 @@ func (w *CustomerBackupSchedulerWorker) Work(ctx context.Context, job *river.Job slog.Info("jobs.customer_backup_scheduler.completed", "candidates", len(candidates), "inserted", inserted, - "skipped_hobby_off_slot", skippedHobbyOffSlot, + "skipped_never_tier", skippedNeverTier, + "skipped_daily_off_slot", skippedDailyOffSlot, "skipped_dedup", skippedDedup, "hour_utc", hourUTC, "duration_ms", time.Since(start).Milliseconds(), diff --git a/internal/jobs/customer_backup_scheduler_test.go b/internal/jobs/customer_backup_scheduler_test.go index ffb839c..2730e79 100644 --- a/internal/jobs/customer_backup_scheduler_test.go +++ b/internal/jobs/customer_backup_scheduler_test.go @@ -17,6 +17,49 @@ import ( // and can't reach the package-test fixture. var errDBInPkg = errors.New("db error (in-package fixture)") +// schedulerPlans returns a BackupPlanRegistry whose rpo_minutes + +// backup_retention_days mirror the real api/plans.yaml values the scheduler +// reads in prod. Pinning the values here (rather than loading the embedded +// YAML) means a future plans.yaml edit that breaks the cadence contract +// fails an explicit assertion, not a moved goalpost — and the cadence logic +// is exercised against the exact promise numbers. +// +// pro / pro_yearly / growth / team / team_yearly → rpo 60 (HOURLY) +// hobby / hobby_yearly / hobby_plus* → rpo 1440 (DAILY) +// anonymous / free → rpo 0 (NEVER) +func schedulerPlans() *fakeBackupPlanRegistry { + return &fakeBackupPlanRegistry{ + rpo: map[string]int{ + "pro": 60, + "pro_yearly": 60, + "growth": 60, + "growth_yearly": 60, + "team": 60, + "team_yearly": 60, + "hobby": 1440, + "hobby_yearly": 1440, + "hobby_plus": 1440, + "hobby_plus_yearly": 1440, + "anonymous": 0, + "free": 0, + }, + days: map[string]int{ + "pro": 30, + "pro_yearly": 30, + "growth": 30, + "growth_yearly": 30, + "team": 90, + "team_yearly": 90, + "hobby": 7, + "hobby_yearly": 7, + "hobby_plus": 14, + "hobby_plus_yearly": 14, + "anonymous": 0, + "free": 0, + }, + } +} + // fakeSchedulerJob is the in-package twin of jobs_test.fakeJob — needed // because this test file lives in `package jobs` so it can poke `now` on // the unexported worker fields without going through a constructor knob. @@ -44,8 +87,110 @@ func TestHobbyDailySlot_Deterministic(t *testing.T) { } } -// TestScheduler_InsertsForProTierEveryHour — happy path. A single pro -// postgres resource yields one INSERT regardless of current hour. +// TestCadenceForTier_RPODriven is the keystone R1 assertion: the cadence is +// derived from plans.yaml rpo_minutes, NOT a hardcoded tier list. Every paid +// tier promising rpo<=60 must be HOURLY; coarser-RPO tiers DAILY; rpo==0 +// (anonymous/free) NEVER. This test iterates the registry's declared map so +// adding a new tier with the wrong cadence trips here (root rule 18 — no +// hand-typed single-site list). +func TestCadenceForTier_RPODriven(t *testing.T) { + plans := schedulerPlans() + want := map[string]cadence{ + "pro": cadenceHourly, + "pro_yearly": cadenceHourly, + "growth": cadenceHourly, + "growth_yearly": cadenceHourly, + "team": cadenceHourly, + "team_yearly": cadenceHourly, + "hobby": cadenceDaily, + "hobby_yearly": cadenceDaily, + "hobby_plus": cadenceDaily, + "hobby_plus_yearly": cadenceDaily, + "anonymous": cadenceNever, + "free": cadenceNever, + } + // Iterate the registry's declared tiers — a tier present in plans but + // missing from `want` is an un-asserted cadence and fails loudly. + for tier := range plans.rpo { + exp, ok := want[tier] + if !ok { + t.Fatalf("tier %q present in registry but not asserted — add it to want", tier) + } + if got := cadenceForTier(plans, tier); got != exp { + t.Errorf("cadenceForTier(%q) = %d, want %d (rpo=%d)", tier, got, exp, plans.RPOMinutes(tier)) + } + } +} + +// TestCadenceForTier_HourlyCutoffBoundary pins the [1,60]→hourly, +// >60→daily boundary so a plans.yaml edit nudging rpo_minutes across 60 +// flips the cadence as intended (and rpo==0 is always never). +func TestCadenceForTier_HourlyCutoffBoundary(t *testing.T) { + cases := []struct { + rpo int + days int + want cadence + }{ + {0, 0, cadenceNever}, // no promise (anonymous/free) + {0, 7, cadenceNever}, // rpo unset but retention set → still never + {1, 7, cadenceHourly}, // tightest non-zero RPO + {60, 30, cadenceHourly}, // exactly the pro/growth/team promise + {61, 30, cadenceDaily}, // just over the line + {1440, 7, cadenceDaily}, // hobby daily + {-5, 7, cadenceNever}, // defensive: negative treated as no-promise + } + for _, c := range cases { + plans := &fakeBackupPlanRegistry{ + rpo: map[string]int{"x": c.rpo}, + days: map[string]int{"x": c.days}, + } + if got := cadenceForTier(plans, "x"); got != c.want { + t.Errorf("rpo=%d days=%d: cadence=%d, want %d", c.rpo, c.days, got, c.want) + } + } +} + +// TestCadenceForTier_ZeroRetentionNeverBacksUp — independent of rpo_minutes, +// a tier with backup_retention_days<=0 must NEVER be enqueued (R1: never back +// up anonymous/free). Even a stray hourly rpo_minutes on a zero-retention +// tier is overridden to "never". +func TestCadenceForTier_ZeroRetentionNeverBacksUp(t *testing.T) { + plans := &fakeBackupPlanRegistry{ + rpo: map[string]int{"weird": 60}, // would be hourly on rpo alone… + days: map[string]int{"weird": 0}, // …but 0-day retention vetoes it + } + if got := cadenceForTier(plans, "weird"); got != cadenceNever { + t.Errorf("zero-retention tier cadence=%d, want cadenceNever", got) + } +} + +// TestCadenceForTier_NilRegistryLegacyFallback — when no registry is wired +// (boot misconfigured) the cadence falls back to the legacy hardcoded +// mapping so paid backups never silently stop. +func TestCadenceForTier_NilRegistryLegacyFallback(t *testing.T) { + cases := map[string]cadence{ + "pro": cadenceHourly, + "pro_yearly": cadenceHourly, + "growth": cadenceHourly, + "team": cadenceHourly, + "team_yearly": cadenceHourly, + "hobby": cadenceDaily, + "hobby_plus": cadenceDaily, + "hobby_yearly": cadenceDaily, + "anonymous": cadenceNever, + "free": cadenceNever, + "": cadenceNever, + } + for tier, want := range cases { + if got := cadenceForTier(nil, tier); got != want { + t.Errorf("legacy cadenceForTier(nil, %q) = %d, want %d", tier, got, want) + } + } +} + +// TestScheduler_InsertsForProTierEveryHour — happy path / R1 core ask: a +// single pro postgres resource (rpo_minutes:60) yields one INSERT regardless +// of the current hour, i.e. it becomes due HOURLY. func TestScheduler_InsertsForProTierEveryHour(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -67,7 +212,7 @@ func TestScheduler_InsertsForProTierEveryHour(t *testing.T) { WithArgs(uuid.MustParse(resID), "pro"). WillReturnResult(sqlmock.NewResult(1, 1)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) // Pin time to 14:00 UTC — for pro this is irrelevant (always inserts). w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } @@ -79,9 +224,102 @@ func TestScheduler_InsertsForProTierEveryHour(t *testing.T) { } } -// TestScheduler_HobbyOffSlot_Skips — hobby tier should NOT insert when -// the current hour-of-day != its daily slot. We construct a team whose -// slot is 5 and run the scheduler at hour 14 — expect no INSERT. +// TestScheduler_ProDueEveryHour_AcrossAllHours — stronger R1 proof: a pro +// resource is enqueued at EVERY hour of the day, never skipped. Catches a +// regression where an off-by-one or stray slot gate accidentally throttled +// an hourly tier to daily. +func TestScheduler_ProDueEveryHour_AcrossAllHours(t *testing.T) { + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + for hour := 0; hour < 24; hour++ { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("hour %d: sqlmock.New: %v", hour, err) + } + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "pro", teamID)) + // MUST insert at every hour — RowsAffected=1. + mock.ExpectExec(`INSERT INTO resource_backups`). + WithArgs(uuid.MustParse(resID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 5, 13, hour, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("hour %d: Work: %v", hour, err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("hour %d: pro must be due hourly but was not enqueued: %v", hour, err) + } + db.Close() + } +} + +// TestScheduler_FreeTierNeverEnqueued — R1 hard requirement: a `free` tier +// resource (rpo_minutes:0, backup_retention_days:0) must NEVER be enqueued. +// The SQL filter excludes 'free' up front, so the candidate set is empty and +// NO INSERT is ever issued. +func TestScheduler_FreeTierNeverEnqueued(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + // The SQL excludes anonymous/free, so the SELECT returns no free rows. + // We simulate the real query returning empty (free filtered out in SQL). + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"})) + // NO INSERT expected — any ExpectExec on resource_backups would be unmet. + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestScheduler_FreeTierGate_SkipsEvenIfRowLeaksThrough — defence-in-depth +// for R1. Even if a `free` row somehow leaks past the SQL filter (e.g. a +// race that flips tier after the WHERE evaluates), the per-row registry +// gate (cadenceForTier → cadenceNever) MUST skip it: no INSERT. +func TestScheduler_FreeTierGate_SkipsEvenIfRowLeaksThrough(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + // The mock returns a 'free' row as if it leaked past the SQL filter. + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "free", teamID)) + // No INSERT — the registry cadence gate must veto the free row. + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("free row was not skipped by the registry gate: %v", err) + } +} + +// TestScheduler_HobbyOffSlot_Skips — hobby tier (daily cadence) should NOT +// insert when the current hour-of-day != its daily slot. We construct a team +// whose slot is 5 and run the scheduler at hour 14 — expect no INSERT. func TestScheduler_HobbyOffSlot_Skips(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -101,7 +339,7 @@ func TestScheduler_HobbyOffSlot_Skips(t *testing.T) { AddRow(resID, "hobby", teamID)) // No EXISTS, no INSERT. - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { @@ -113,7 +351,7 @@ func TestScheduler_HobbyOffSlot_Skips(t *testing.T) { } // TestScheduler_HobbyOnSlot_Inserts — when the current hour matches the -// team's daily slot, the hobby row gets inserted. +// team's daily slot, the hobby (daily cadence) row gets inserted. func TestScheduler_HobbyOnSlot_Inserts(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -131,7 +369,7 @@ func TestScheduler_HobbyOnSlot_Inserts(t *testing.T) { WithArgs(uuid.MustParse(resID), "hobby"). WillReturnResult(sqlmock.NewResult(1, 1)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) // Hour 5 = the team's slot. w.now = func() time.Time { return time.Date(2026, 5, 13, 5, 0, 0, 0, time.UTC) } @@ -143,12 +381,16 @@ func TestScheduler_HobbyOnSlot_Inserts(t *testing.T) { } } -// TestScheduler_DedupExists_Skips — a recent row inside the 50min lookback -// should suppress the INSERT. P2-W4: the dedupe is now atomic inside the -// INSERT statement, so the worker always issues the INSERT but a recent -// row makes the `WHERE NOT EXISTS` arm match → RowsAffected=0 → the worker -// counts it as a deduped skip rather than an inserted row. -func TestScheduler_DedupExists_Skips(t *testing.T) { +// TestScheduler_NoDuplicateEnqueueWithinHour — R1 idempotency requirement: +// a recent scheduled row inside the 50min lookback must suppress a second +// INSERT in the same hour. The dedupe is atomic inside the INSERT statement +// (P2-W4), so the worker always issues the INSERT but a recent row makes the +// `WHERE NOT EXISTS` arm match → RowsAffected=0 → no duplicate backup. +// +// This proves "no duplicate enqueue within an hour": even though the +// scheduler runs and re-evaluates the pro resource (hourly cadence), the DB +// dedupe arm collapses the second attempt to a no-op. +func TestScheduler_NoDuplicateEnqueueWithinHour(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { t.Fatalf("sqlmock.New: %v", err) @@ -161,12 +403,12 @@ func TestScheduler_DedupExists_Skips(t *testing.T) { mock.ExpectQuery(`SELECT r.id::text`). WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). AddRow(resID, "pro", teamID)) - // INSERT runs but the NOT EXISTS arm matches the recent row → 0 rows. + // INSERT runs but the NOT EXISTS arm matches the recent (<50min) row → 0 rows. mock.ExpectExec(`INSERT INTO resource_backups`). WithArgs(uuid.MustParse(resID), "pro"). WillReturnResult(sqlmock.NewResult(0, 0)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { @@ -183,6 +425,11 @@ func TestScheduler_DedupExists_Skips(t *testing.T) { // The old check-then-act shape let two concurrent ticks both observe // existed=false and both INSERT, double-scheduling a backup. // +// Combined with the 50-minute DB lookback, this is what survives the +// WeeklyDigest-fired-daily failure mode (River periodic + RunOnStart can +// fire two ticks back-to-back on restart): dedupe lives in the DB, not in +// River's UniqueOpts. +// // sqlmock's QueryMatcherRegexp asserts the worker issues exactly one // statement carrying both `INSERT INTO resource_backups` and the // `WHERE NOT EXISTS` guard — a regression to the two-statement shape @@ -206,7 +453,7 @@ func TestScheduler_DedupeIsAtomicInsert(t *testing.T) { WithArgs(uuid.MustParse(resID), "pro"). WillReturnResult(sqlmock.NewResult(1, 1)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { @@ -217,11 +464,47 @@ func TestScheduler_DedupeIsAtomicInsert(t *testing.T) { } } +// TestScheduler_DedupeLookbackIsUnderOneHour pins the 50-minute lookback: +// the dedupe window must be < 60 minutes so that the SAME hour-bucket is +// always covered by the previous tick's row, yet the NEXT hour's tick (an +// hourly-cadence pro tier) is NOT suppressed. The INSERT SQL must carry the +// `INTERVAL '50 minutes'` guard. +func TestScheduler_DedupeLookbackIsUnderOneHour(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + mock.ExpectQuery(`SELECT r\.id::text`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "pro", teamID)) + // Assert the dedupe interval is 50 minutes (< 1h) — a regression to e.g. + // 60+ minutes would suppress the legitimate next-hour backup. + mock.ExpectExec(`INTERVAL '50 minutes'`). + WithArgs(uuid.MustParse(resID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("dedupe lookback is not '50 minutes' (<1h): %v", err) + } +} + // TestScheduler_HobbyPlus_OnSlotInserts — FIX-H regression. Hobby Plus -// (the $19/mo mid-tier) MUST be in the scheduled-backup set. Pre-fix the -// scheduler hardcoded `tier IN ('hobby','pro','growth','team')` and any -// hobby_plus / hobby_plus_yearly / pro_yearly customer received zero -// scheduled backups despite paying for them. +// (the $19/mo mid-tier, rpo_minutes:1440 → daily) MUST be in the +// scheduled-backup set. Pre-fix the scheduler hardcoded +// `tier IN ('hobby','pro','growth','team')` and any hobby_plus customer +// received zero scheduled backups despite paying for them. Now driven by +// the registry, so any paid tier with retention>0 is covered. func TestScheduler_HobbyPlus_OnSlotInserts(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -243,7 +526,7 @@ func TestScheduler_HobbyPlus_OnSlotInserts(t *testing.T) { WithArgs(uuid.MustParse(resID), "hobby_plus"). WillReturnResult(sqlmock.NewResult(1, 1)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) w.now = func() time.Time { return time.Date(2026, 5, 14, 5, 0, 0, 0, time.UTC) } if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { @@ -255,9 +538,9 @@ func TestScheduler_HobbyPlus_OnSlotInserts(t *testing.T) { } // TestScheduler_YearlyVariants_BackupHourly — pro_yearly and team_yearly -// (and any other _yearly tier with hourly cadence) must back up every -// hour just like their canonical monthly counterpart. Regression guard -// for the FIX-H widened tier set. +// (rpo_minutes:60 → hourly) must back up every hour just like their canonical +// monthly counterpart. Regression guard: the registry resolves the variant's +// own rpo_minutes, so no _yearly special-casing is needed in the scheduler. func TestScheduler_YearlyVariants_BackupHourly(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { @@ -275,7 +558,7 @@ func TestScheduler_YearlyVariants_BackupHourly(t *testing.T) { WithArgs(uuid.MustParse(resID), "pro_yearly"). WillReturnResult(sqlmock.NewResult(1, 1)) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) // Hour 14 — pro_yearly should fire regardless (hourly cadence). w.now = func() time.Time { return time.Date(2026, 5, 14, 14, 0, 0, 0, time.UTC) } @@ -287,22 +570,53 @@ func TestScheduler_YearlyVariants_BackupHourly(t *testing.T) { } } +// TestScheduler_NilRegistry_LegacyFallbackInserts — when no registry is +// wired the scheduler falls back to the legacy hardcoded cadence (pro hourly) +// so paid backups never silently stop on a misconfigured boot. +func TestScheduler_NilRegistry_LegacyFallbackInserts(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "pro", teamID)) + mock.ExpectExec(`INSERT INTO resource_backups`). + WithArgs(uuid.MustParse(resID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewCustomerBackupSchedulerWorker(db, nil) // nil registry → legacy path + w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("nil-registry legacy fallback did not enqueue pro: %v", err) + } +} + // TestCanonicalTier — sanity: _yearly strips, others pass through. func TestCanonicalTier(t *testing.T) { cases := map[string]string{ - "hobby": "hobby", - "hobby_yearly": "hobby", - "hobby_plus": "hobby_plus", - "hobby_plus_yearly": "hobby_plus", - "pro": "pro", - "pro_yearly": "pro", - "team": "team", - "team_yearly": "team", - "growth": "growth", - "growth_yearly": "growth", - "anonymous": "anonymous", - "": "", - "_yearly": "_yearly", // not stripped — guard: too short + "hobby": "hobby", + "hobby_yearly": "hobby", + "hobby_plus": "hobby_plus", + "hobby_plus_yearly": "hobby_plus", + "pro": "pro", + "pro_yearly": "pro", + "team": "team", + "team_yearly": "team", + "growth": "growth", + "growth_yearly": "growth", + "anonymous": "anonymous", + "": "", + "_yearly": "_yearly", // not stripped — guard: too short } for in, want := range cases { got := canonicalTier(in) @@ -322,7 +636,7 @@ func TestScheduler_DBSelectError_ReturnsError(t *testing.T) { mock.ExpectQuery(`SELECT r.id::text`).WillReturnError(errDBInPkg) - w := NewCustomerBackupSchedulerWorker(db) + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) if err := w.Work(context.Background(), fakeSchedulerJob()); err == nil { t.Fatal("expected error from SELECT failure, got nil") } diff --git a/internal/jobs/workers.go b/internal/jobs/workers.go index 6aa052d..1189728 100644 --- a/internal/jobs/workers.go +++ b/internal/jobs/workers.go @@ -603,7 +603,11 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi // unset — fail-open so a dev environment that doesn't ship AES keys // doesn't block worker boot. See each worker's Work() top for the // exact WARN line emitted. - river.AddWorker(workers, WithObservability(NewCustomerBackupSchedulerWorker(db), nrApp)) + // backupPlans drives the scheduler's RPO-aware cadence: tiers promising + // rpo_minutes<=60 (pro/growth/team) get hourly backups; coarser-RPO + // tiers (hobby/hobby_plus = 1440) get the once-daily slot; anonymous/free + // (rpo_minutes:0) are never enqueued. nil → legacy hardcoded fallback. + river.AddWorker(workers, WithObservability(NewCustomerBackupSchedulerWorker(db, backupPlans), nrApp)) // FIX-H #65/#Q47 — wire the refund client so terminal MANUAL backup // failures credit the team's daily counter via the api's internal // /internal/teams/:id/backup-quota/refund endpoint. Empty apiBase /