diff --git a/Dockerfile b/Dockerfile index 0101cdb..f78fd8f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,5 +30,20 @@ RUN CGO_ENABLED=0 go build \ # the same PR — pg_dump major version must be >= server major version, and # matching exactly keeps the dump format ABI predictable. FROM postgres:16-alpine +# R2 (2026-06-10): the customer-backup ladder grew from postgres/vector +# (pg_dump, already in this base) to also cover mongodb (mongodump/mongorestore) +# and redis (redis-cli --rdb). Those binaries are NOT in postgres:16-alpine, so +# install them here or the new dump strategies fail at exec with "executable +# file not found" (the runner fail-opens → marks the row failed + increments +# instant_customer_backup_by_type_total{result="failed"}; no data loss, but no +# Mongo/Redis backup either). mongodb-tools = mongodump+mongorestore; redis = +# redis-cli. --no-cache keeps the image lean. +# +# Version note (mirrors the pg_dump >= server rule above): mongodump's archive +# format is forward-compatible within a major, and redis-cli --rdb speaks the +# RDB version of the server it connects to, so alpine's packaged versions are +# fine against the in-cluster mongodb/redis. Bump deliberately if the data-tier +# images move a major version. +RUN apk add --no-cache mongodb-tools redis COPY --from=builder /worker /worker ENTRYPOINT ["/worker"] diff --git a/internal/jobs/backup_dump.go b/internal/jobs/backup_dump.go new file mode 100644 index 0000000..a410d40 --- /dev/null +++ b/internal/jobs/backup_dump.go @@ -0,0 +1,277 @@ +// backup_dump.go — per-resource_type dump strategies for the customer-backup +// runner. Mirrors the pgDumpRunner seam already in customer_backup_runner.go +// so the runner can back up postgres/vector (pg_dump), mongodb (mongodump), +// and redis (redis-cli --rdb) through ONE pipeline (gzip → sha256 → S3) with +// ONE retention/cadence/keep-last-N policy. +// +// R2 (2026-06-10) — closing the durability gap. Before this file the backup +// ladder backed up postgres/vector ONLY; the product sells "backups + +// 1-click restore" for ALL paid resources, so Mongo + Redis had ZERO +// automated backup (worker #103 note + GAP-AUDIT-2026-06-10). This file adds +// the Mongo + Redis dump strategies; the runner dispatches on resource_type. +// +// THE GZIP CONTRACT (why this matters): the existing pg path writes a RAW +// (uncompressed) `pg_dump --format=custom` archive into the runner's gzip +// writer — the pipeline owns compression, the S3 object is `.gz`, +// and the restore path gunzips then pipes to pg_restore. To keep ONE +// pipeline + ONE object layout + ONE sha256/restore story, every dumpRunner +// here writes RAW (uncompressed) bytes too. Concretely: +// +// - mongodump: `--archive` (NOT `--archive --gzip`). mongodump's own +// --gzip would double-compress under the pipeline's gzip layer, bloating +// the object and breaking the "gunzip → mongorestore --archive" restore +// symmetry. Restore gunzips, then pipes to `mongorestore --archive`. +// - redis-cli: `--rdb -` streams the live RDB snapshot to stdout (a single +// uncompressed RDB blob). The pipeline gzips it to `.dump.gz` exactly +// like the pg/mongo archives. +// +// SECRET HYGIENE (mirrors SEC-WORKER FINDING-2 on the pg path): the customer +// credential must NOT sit in argv (/proc//cmdline, `ps aux`, kubectl +// describe crash archive) for the multi-minute backup window. pg_dump uses +// PGPASSWORD env (splitPGPassword). mongodump accepts the full mongodb URI in +// `--uri` — the URI carries the password, so we pass it on stdin-equivalent… +// mongodump has no env-password knob, BUT it DOES read the URI from +// `--uri=`? No: mongodump's only password-out-of-argv path is +// interactive prompt, which we can't drive. We therefore pass the URI via the +// MONGODB_URI-equivalent the tool honors: mongodump reads `--uri` from argv. +// To keep the secret out of argv we instead write the URI to a 0600 temp file +// and pass `--config=` (mongodump supports a YAML config with a +// `uri:`/`password:` field). See realMongoDumpRunner for the exact mechanism. +package jobs + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "os/exec" +) + +// Resource-type string constants — the values stored in resources.resource_type +// and echoed in resource_backups. Kept as named constants (not scattered +// literals) so the dispatch + scheduler + tests reference one source. +const ( + resourceTypePostgres = "postgres" + resourceTypeVector = "vector" + resourceTypeMongoDB = "mongodb" + resourceTypeRedis = "redis" +) + +// mongoDumpRunner abstracts `mongodump` execution so tests can substitute a +// fake without a live Mongo. Mirrors pgDumpRunner exactly: Run writes the RAW +// (uncompressed) BSON archive to w; the runner's gzip layer compresses it. +type mongoDumpRunner interface { + Run(ctx context.Context, connURL string, w io.Writer) error +} + +// redisDumpRunner abstracts `redis-cli --rdb -` execution. Run writes the RAW +// (uncompressed) RDB snapshot to w; the runner's gzip layer compresses it. +type redisDumpRunner interface { + Run(ctx context.Context, connURL string, w io.Writer) error +} + +// realMongoDumpRunner shells out to the real `mongodump` binary, streaming a +// `--archive` (uncompressed) BSON archive to stdout. +// +// Secret hygiene: the mongodb URI carries the password in its userinfo. To +// keep it out of argv (mongodump has no PGPASSWORD-style env knob), we write a +// minimal mongodump YAML config file (mode 0600, in the pod's tmpfs) carrying +// `uri:` and pass `--config=`. The file is removed on return. Fail-open +// on a temp-file error: fall back to `--uri` in argv (no regression vs a +// world with no mongo backup at all) and log nothing here — the runner's +// failure path captures any downstream error. +type realMongoDumpRunner struct{} + +func (realMongoDumpRunner) Run(ctx context.Context, connURL string, w io.Writer) error { + // Try the config-file path first so the URI (with password) stays out of + // argv. mongodump's config file is YAML with a top-level `uri:` key. + cfgPath, cleanup, cfgErr := writeMongoConfig(connURL) + var cmd *exec.Cmd + if cfgErr == nil { + defer cleanup() + cmd = exec.CommandContext(ctx, "mongodump", + "--config", cfgPath, + "--archive", // uncompressed; the runner pipeline gzips + ) + } else { + // Fail-open: pass the URI in argv. Less ideal (secret in cmdline) but + // strictly better than no backup. The leak window is the dump + // duration only, same posture the pg path documents for its parse + // fail-open branch. + cmd = exec.CommandContext(ctx, "mongodump", + "--uri", connURL, + "--archive", + ) + } + cmd.Stdout = w + var stderrBuf limitedBuffer + cmd.Stderr = &stderrBuf + if err := cmd.Run(); err != nil { + return fmt.Errorf("mongodump: %w (stderr: %s)", err, stderrBuf.String()) + } + return nil +} + +// Test seams for writeMongoConfig's filesystem operations. The chmod / write / +// sync failure arms cannot be forced against a real, freshly created temp file +// (a healthy fd accepts all three), so each op routes through an injectable +// package var — same seam pattern as txtLookupFunc (custom_domain_reconcile.go) +// and deployNotifyResolver (deploy_notify_webhook.go). Production behavior is +// the default literal; tests swap + defer-restore. +var ( + mongoCfgCreateTemp = func() (*os.File, error) { + return os.CreateTemp("", "instant-mongodump-*.yaml") + } + // 0600 — only the worker process can read the URI. CreateTemp already + // uses 0600 on unix, but set it explicitly so the contract is loud. + mongoCfgChmod = func(f *os.File) error { return f.Chmod(0o600) } + // mongodump config YAML: a single `uri:` key. Quote the value so a URI + // with YAML-special characters (e.g. a password containing ':' or '@') + // is parsed as a single scalar. + mongoCfgWriteURI = func(f *os.File, connURL string) error { + _, err := fmt.Fprintf(f, "uri: %q\n", connURL) + return err + } + mongoCfgSync = func(f *os.File) error { return f.Sync() } +) + +// writeMongoConfig writes a mongodump YAML config carrying the connection URI +// to a 0600 temp file and returns its path plus a cleanup func. Keeps the +// password out of argv. The caller MUST invoke cleanup() to remove the file. +func writeMongoConfig(connURL string) (path string, cleanup func(), err error) { + f, err := mongoCfgCreateTemp() + if err != nil { + return "", func() {}, fmt.Errorf("create mongodump config: %w", err) + } + cleanup = func() { + _ = f.Close() + _ = os.Remove(f.Name()) + } + if chmodErr := mongoCfgChmod(f); chmodErr != nil { + cleanup() + return "", func() {}, fmt.Errorf("chmod mongodump config: %w", chmodErr) + } + if wErr := mongoCfgWriteURI(f, connURL); wErr != nil { + cleanup() + return "", func() {}, fmt.Errorf("write mongodump config: %w", wErr) + } + if syncErr := mongoCfgSync(f); syncErr != nil { + cleanup() + return "", func() {}, fmt.Errorf("sync mongodump config: %w", syncErr) + } + return f.Name(), cleanup, nil +} + +// realRedisDumpRunner shells out to `redis-cli --rdb -`, streaming the live +// RDB snapshot to stdout. `-` for the filename means stdout (redis-cli +// 4.0+). The runner pipeline gzips the RDB blob. +// +// Secret hygiene: redis-cli accepts the password via the REDISCLI_AUTH env +// var (libredis honors it the same way PGPASSWORD works for pg_dump), so we +// split the password out of the URI and pass host/port/db/tls as flags + +// REDISCLI_AUTH on the env. The password never appears in argv. Fail-open on +// a URI parse error: pass the raw `-u ` form (secret in argv) so a +// malformed-but-valid-to-redis URI still backs up. +type realRedisDumpRunner struct{} + +func (realRedisDumpRunner) Run(ctx context.Context, connURL string, w io.Writer) error { + host, port, password, useTLS, parseErr := splitRedisURL(connURL) + var cmd *exec.Cmd + if parseErr == nil { + args := []string{"-h", host, "-p", port} + if useTLS { + args = append(args, "--tls") + } + args = append(args, "--rdb", "-") // "-" = stream RDB to stdout + cmd = exec.CommandContext(ctx, "redis-cli", args...) + if password != "" { + // REDISCLI_AUTH keeps the password out of argv (same posture as + // PGPASSWORD on the pg path). + cmd.Env = append(os.Environ(), "REDISCLI_AUTH="+password) + } + } else { + // Fail-open: -u (secret in argv). Strictly better than no + // backup; the leak window is the dump duration only. + cmd = exec.CommandContext(ctx, "redis-cli", "-u", connURL, "--rdb", "-") + } + cmd.Stdout = w + var stderrBuf limitedBuffer + cmd.Stderr = &stderrBuf + if err := cmd.Run(); err != nil { + return fmt.Errorf("redis-cli --rdb: %w (stderr: %s)", err, stderrBuf.String()) + } + return nil +} + +// splitRedisURL parses a redis://[:password@]host[:port][/db] (or rediss:// +// for TLS) URL into its parts so redis-cli can be invoked with the password +// out of argv (via REDISCLI_AUTH). Returns an error if the URL can't be +// parsed; the caller falls back to `-u ` (fail-open). Defaults: port +// 6379, db unset. rediss:// → useTLS=true. +func splitRedisURL(rawURL string) (host, port, password string, useTLS bool, err error) { + u, err := url.Parse(rawURL) + if err != nil { + return "", "", "", false, fmt.Errorf("parse redis url: %w", err) + } + switch u.Scheme { + case "redis": + useTLS = false + case "rediss": + useTLS = true + default: + return "", "", "", false, fmt.Errorf("unexpected redis scheme %q", u.Scheme) + } + host = u.Hostname() + if host == "" { + return "", "", "", false, fmt.Errorf("redis url missing host") + } + port = u.Port() + if port == "" { + port = "6379" + } + if u.User != nil { + if pw, ok := u.User.Password(); ok { + password = pw + } + } + return host, port, password, useTLS, nil +} + +// backupSupportedResourceType reports whether the customer-backup ladder knows +// how to dump the given resource_type. The scheduler's SQL filter and the +// runner's dispatch both anchor on this single predicate so the "what's +// backed up" set lives in one place (root rule 16/18 — no scattered list). +func backupSupportedResourceType(resourceType string) bool { + switch resourceType { + case resourceTypePostgres, resourceTypeVector, resourceTypeMongoDB, resourceTypeRedis: + return true + default: + return false + } +} + +// dumpForResourceType returns the dumpRunner Run func for the given +// resource_type, or nil + a descriptive reason when the type is unsupported. +// The runner uses the returned closure so postgres/vector/mongodb/redis all +// flow through ONE pipeline (gzip → sha256 → S3). Keeping the dispatch here +// (not inline in processBackup) lets the unit test assert the mapping +// directly. +func (w *CustomerBackupRunnerWorker) dumpForResourceType(resourceType string) (func(ctx context.Context, connURL string, out io.Writer) error, string) { + switch resourceType { + case resourceTypePostgres, resourceTypeVector: + return w.pgDump.Run, "" + case resourceTypeMongoDB: + if w.mongoDump == nil { + return nil, "mongo dump runner not configured" + } + return w.mongoDump.Run, "" + case resourceTypeRedis: + if w.redisDump == nil { + return nil, "redis dump runner not configured" + } + return w.redisDump.Run, "" + default: + return nil, fmt.Sprintf("unsupported resource_type %q for backup", resourceType) + } +} diff --git a/internal/jobs/backup_dump_failure_test.go b/internal/jobs/backup_dump_failure_test.go new file mode 100644 index 0000000..523d181 --- /dev/null +++ b/internal/jobs/backup_dump_failure_test.go @@ -0,0 +1,338 @@ +package jobs + +import ( + "bytes" + "context" + "errors" + "os" + "path/filepath" + "runtime" + "strings" + "testing" +) + +// backup_dump_failure_test.go — error-path + branch-arm coverage for the R2 +// mongo/redis dump + restore runners (backup_dump.go, +// customer_restore_runner.go). The happy paths live in backup_dump_test.go; +// this file pins: +// +// - the fail-open "secret in argv" branches (writeMongoConfig failure → +// --uri argv for mongodump/mongorestore; redis URL parse failure → +// `-u ` for redis-cli), +// - the subprocess-failure returns (non-zero exit → wrapped error carrying +// stderr), +// - every writeMongoConfig error arm (CreateTemp / chmod / write / sync) +// via the mongoCfg* package seams, +// - the rediss:// → --tls flag arm. + +// failMongoCfgCreateTemp swaps the mongoCfgCreateTemp seam for one that always +// fails, restoring the original on test cleanup. Forces the fail-open +// (URI-in-argv) branches of realMongoDumpRunner / realMongoRestoreRunner and +// the first error arm of writeMongoConfig. +func failMongoCfgCreateTemp(t *testing.T) { + t.Helper() + orig := mongoCfgCreateTemp + mongoCfgCreateTemp = func() (*os.File, error) { + return nil, errors.New("tmpfs exhausted (injected)") + } + t.Cleanup(func() { mongoCfgCreateTemp = orig }) +} + +// installFakeFailingBinary writes a shell-script stand-in that records argv, +// prints stderrMsg on stderr, and exits 1 — so the cmd.Run() error returns +// (which wrap the exit error + captured stderr) can be exercised without the +// real mongodump/mongorestore/redis-cli binaries. +func installFakeFailingBinary(t *testing.T, name, stderrMsg string) (dir string) { + t.Helper() + if runtime.GOOS == "windows" { + t.Skip("fake CLI script is shell-based; worker runs on linux/darwin only") + } + dir = t.TempDir() + script := "#!/bin/sh\n" + + "printf '%s\\n' \"$@\" > \"" + dir + "/argv.txt\"\n" + + "printf '%s' \"" + stderrMsg + "\" >&2\n" + + "exit 1\n" + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte(script), 0o755); err != nil { + t.Fatalf("write fake %s: %v", name, err) + } + t.Setenv("PATH", dir+string(os.PathListSeparator)+os.Getenv("PATH")) + return dir +} + +// TestRealMongoDumpRunner_FailOpenURIInArgv — when the 0600 config file can't +// be written, the runner falls open to `--uri ` in argv (documented +// posture: a leaked-cmdline window beats no backup at all). mongodump must +// still get --archive and must NOT get --config. +func TestRealMongoDumpRunner_FailOpenURIInArgv(t *testing.T) { + failMongoCfgCreateTemp(t) + dir := installFakeBinary(t, "mongodump", "", "") + connURL := "mongodb://admin:pw@m.host:27017/app" + + var out bytes.Buffer + if err := (realMongoDumpRunner{}).Run(context.Background(), connURL, &out); err != nil { + t.Fatalf("Run: %v", err) + } + if out.String() != "fakebody" { + t.Errorf("stdout = %q; want fakebody", out.String()) + } + argv := readArgv(t, dir) + sawURI, sawArchive, sawConfig := false, false, false + for i, a := range argv { + switch a { + case "--uri": + sawURI = true + if i+1 >= len(argv) || argv[i+1] != connURL { + t.Errorf("--uri not followed by connURL; argv = %q", argv) + } + case "--archive": + sawArchive = true + case "--config": + sawConfig = true + } + } + if !sawURI || !sawArchive { + t.Errorf("fail-open mongodump argv = %q; want --uri --archive", argv) + } + if sawConfig { + t.Errorf("fail-open mongodump argv unexpectedly carries --config: %q", argv) + } +} + +// TestRealMongoDumpRunner_ExecError — a non-zero mongodump exit is wrapped +// with the captured stderr so the backup row's error_summary is actionable. +func TestRealMongoDumpRunner_ExecError(t *testing.T) { + installFakeFailingBinary(t, "mongodump", "boom-mongodump-stderr") + + var out bytes.Buffer + err := (realMongoDumpRunner{}).Run(context.Background(), "mongodb://h:27017/db", &out) + if err == nil { + t.Fatal("Run err = nil; want wrapped exec error") + } + if !strings.Contains(err.Error(), "mongodump:") { + t.Errorf("err = %q; want it to name mongodump", err) + } + if !strings.Contains(err.Error(), "boom-mongodump-stderr") { + t.Errorf("err = %q; want it to carry the captured stderr", err) + } +} + +// TestWriteMongoConfig_ErrorArms — each filesystem failure arm returns a +// distinctly wrapped error, an empty path, a callable no-op cleanup, and (for +// arms past CreateTemp) removes the partially written temp file. +func TestWriteMongoConfig_ErrorArms(t *testing.T) { + // captureTempName wraps the real CreateTemp so the test can assert the + // temp file is gone after a downstream arm fails. + captureTempName := func(t *testing.T, name *string) { + t.Helper() + orig := mongoCfgCreateTemp + mongoCfgCreateTemp = func() (*os.File, error) { + f, err := orig() + if f != nil { + *name = f.Name() + } + return f, err + } + t.Cleanup(func() { mongoCfgCreateTemp = orig }) + } + + t.Run("create_temp", func(t *testing.T) { + failMongoCfgCreateTemp(t) + path, cleanup, err := writeMongoConfig("mongodb://h/db") + if err == nil || !strings.Contains(err.Error(), "create mongodump config") { + t.Fatalf("err = %v; want create mongodump config error", err) + } + if path != "" { + t.Errorf("path = %q; want empty", path) + } + cleanup() // must be a callable no-op, never nil + }) + + t.Run("chmod", func(t *testing.T) { + var tmpName string + captureTempName(t, &tmpName) + orig := mongoCfgChmod + mongoCfgChmod = func(*os.File) error { return errors.New("chmod denied (injected)") } + t.Cleanup(func() { mongoCfgChmod = orig }) + + path, cleanup, err := writeMongoConfig("mongodb://h/db") + if err == nil || !strings.Contains(err.Error(), "chmod mongodump config") { + t.Fatalf("err = %v; want chmod mongodump config error", err) + } + if path != "" { + t.Errorf("path = %q; want empty", path) + } + cleanup() + if tmpName == "" { + t.Fatal("seam never observed a temp file name") + } + if _, statErr := os.Stat(tmpName); !os.IsNotExist(statErr) { + t.Errorf("temp file %s still present after chmod-arm failure (stat err %v); want removed", tmpName, statErr) + } + }) + + t.Run("write", func(t *testing.T) { + var tmpName string + captureTempName(t, &tmpName) + orig := mongoCfgWriteURI + mongoCfgWriteURI = func(*os.File, string) error { return errors.New("disk full (injected)") } + t.Cleanup(func() { mongoCfgWriteURI = orig }) + + path, cleanup, err := writeMongoConfig("mongodb://h/db") + if err == nil || !strings.Contains(err.Error(), "write mongodump config") { + t.Fatalf("err = %v; want write mongodump config error", err) + } + if path != "" { + t.Errorf("path = %q; want empty", path) + } + cleanup() + if _, statErr := os.Stat(tmpName); !os.IsNotExist(statErr) { + t.Errorf("temp file %s still present after write-arm failure (stat err %v); want removed", tmpName, statErr) + } + }) + + t.Run("sync", func(t *testing.T) { + var tmpName string + captureTempName(t, &tmpName) + orig := mongoCfgSync + mongoCfgSync = func(*os.File) error { return errors.New("fsync io error (injected)") } + t.Cleanup(func() { mongoCfgSync = orig }) + + path, cleanup, err := writeMongoConfig("mongodb://h/db") + if err == nil || !strings.Contains(err.Error(), "sync mongodump config") { + t.Fatalf("err = %v; want sync mongodump config error", err) + } + if path != "" { + t.Errorf("path = %q; want empty", path) + } + cleanup() + if _, statErr := os.Stat(tmpName); !os.IsNotExist(statErr) { + t.Errorf("temp file %s still present after sync-arm failure (stat err %v); want removed", tmpName, statErr) + } + }) +} + +// TestRealRedisDumpRunner_TLSFlag — a rediss:// URL must add --tls to the +// redis-cli invocation (and still keep the password out of argv). +func TestRealRedisDumpRunner_TLSFlag(t *testing.T) { + dir := installFakeBinary(t, "redis-cli", "REDISCLI_AUTH", "auth.txt") + const secret = "tls-redis-PW" + connURL := "rediss://:" + secret + "@r.tls.host:6380" + + var out bytes.Buffer + if err := (realRedisDumpRunner{}).Run(context.Background(), connURL, &out); err != nil { + t.Fatalf("Run: %v", err) + } + argv := readArgv(t, dir) + sawTLS := false + for _, a := range argv { + if a == "--tls" { + sawTLS = true + } + if strings.Contains(a, secret) { + t.Errorf("argv leaks redis password: %q (full argv: %q)", a, argv) + } + } + if !sawTLS { + t.Errorf("rediss:// invocation missing --tls; argv = %q", argv) + } + if auth := mustReadString(t, filepath.Join(dir, "auth.txt")); auth != secret { + t.Errorf("REDISCLI_AUTH = %q; want %q", auth, secret) + } +} + +// TestRealRedisDumpRunner_FailOpenRawURI — an unparseable/unexpected-scheme +// URL falls open to `-u --rdb -` (secret may sit in argv; documented +// posture: strictly better than no backup). +func TestRealRedisDumpRunner_FailOpenRawURI(t *testing.T) { + dir := installFakeBinary(t, "redis-cli", "", "") + connURL := "http://not-a-redis-scheme:80" // splitRedisURL rejects the scheme + + var out bytes.Buffer + if err := (realRedisDumpRunner{}).Run(context.Background(), connURL, &out); err != nil { + t.Fatalf("Run: %v", err) + } + if out.String() != "fakebody" { + t.Errorf("stdout = %q; want fakebody", out.String()) + } + argv := readArgv(t, dir) + want := []string{"-u", connURL, "--rdb", "-"} + if len(argv) != len(want) { + t.Fatalf("fail-open argv = %q; want %q", argv, want) + } + for i := range want { + if argv[i] != want[i] { + t.Fatalf("fail-open argv = %q; want %q", argv, want) + } + } +} + +// TestRealRedisDumpRunner_ExecError — non-zero redis-cli exit is wrapped with +// the captured stderr. +func TestRealRedisDumpRunner_ExecError(t *testing.T) { + installFakeFailingBinary(t, "redis-cli", "boom-redis-stderr") + + var out bytes.Buffer + err := (realRedisDumpRunner{}).Run(context.Background(), "redis://:pw@h:6379", &out) + if err == nil { + t.Fatal("Run err = nil; want wrapped exec error") + } + if !strings.Contains(err.Error(), "redis-cli --rdb") { + t.Errorf("err = %q; want it to name redis-cli --rdb", err) + } + if !strings.Contains(err.Error(), "boom-redis-stderr") { + t.Errorf("err = %q; want it to carry the captured stderr", err) + } +} + +// TestRealMongoRestoreRunner_FailOpenURIInArgv — when the config file can't be +// written, mongorestore falls open to `--uri --archive --drop`. +func TestRealMongoRestoreRunner_FailOpenURIInArgv(t *testing.T) { + failMongoCfgCreateTemp(t) + dir := installFakeBinary(t, "mongorestore", "", "") + connURL := "mongodb://admin:pw@m.host:27017/app" + + if err := (realMongoRestoreRunner{}).Run(context.Background(), connURL, bytes.NewReader([]byte("archive-bytes"))); err != nil { + t.Fatalf("Run: %v", err) + } + argv := readArgv(t, dir) + sawURI, sawArchive, sawDrop, sawConfig := false, false, false, false + for i, a := range argv { + switch a { + case "--uri": + sawURI = true + if i+1 >= len(argv) || argv[i+1] != connURL { + t.Errorf("--uri not followed by connURL; argv = %q", argv) + } + case "--archive": + sawArchive = true + case "--drop": + sawDrop = true + case "--config": + sawConfig = true + } + } + if !sawURI || !sawArchive || !sawDrop { + t.Errorf("fail-open mongorestore argv = %q; want --uri --archive --drop", argv) + } + if sawConfig { + t.Errorf("fail-open mongorestore argv unexpectedly carries --config: %q", argv) + } +} + +// TestRealMongoRestoreRunner_ExecError — non-zero mongorestore exit is wrapped +// with the captured stderr so the restore row's error_summary is actionable. +func TestRealMongoRestoreRunner_ExecError(t *testing.T) { + installFakeFailingBinary(t, "mongorestore", "boom-mongorestore-stderr") + + err := (realMongoRestoreRunner{}).Run(context.Background(), "mongodb://h:27017/db", bytes.NewReader([]byte("archive-bytes"))) + if err == nil { + t.Fatal("Run err = nil; want wrapped exec error") + } + if !strings.Contains(err.Error(), "mongorestore:") { + t.Errorf("err = %q; want it to name mongorestore", err) + } + if !strings.Contains(err.Error(), "boom-mongorestore-stderr") { + t.Errorf("err = %q; want it to carry the captured stderr", err) + } +} diff --git a/internal/jobs/backup_dump_test.go b/internal/jobs/backup_dump_test.go new file mode 100644 index 0000000..f5ebb22 --- /dev/null +++ b/internal/jobs/backup_dump_test.go @@ -0,0 +1,329 @@ +package jobs + +import ( + "bytes" + "context" + "io" + "os" + "path/filepath" + "runtime" + "testing" +) + +// fakeMongoDump / fakeRedisDump mirror fakePgDump (customer_backup_runner_test.go): +// they record the connURL they were handed and write a fixed payload so the +// runner's gzip+sha+upload pipeline can be exercised without a live Mongo/Redis. +type fakeMongoDump struct { + payload []byte + err error + gotConn string +} + +func (f *fakeMongoDump) Run(_ context.Context, connURL string, w io.Writer) error { + f.gotConn = connURL + if f.err != nil { + return f.err + } + _, err := w.Write(f.payload) + return err +} + +type fakeRedisDump struct { + payload []byte + err error + gotConn string +} + +func (f *fakeRedisDump) Run(_ context.Context, connURL string, w io.Writer) error { + f.gotConn = connURL + if f.err != nil { + return f.err + } + _, err := w.Write(f.payload) + return err +} + +// TestBackupSupportedResourceType pins the single source of truth for "what's +// backed up". A regression that drops mongodb/redis (or adds an unsupported +// type) trips here. The scheduler SQL filter + runner dispatch both anchor on +// this predicate. +func TestBackupSupportedResourceType(t *testing.T) { + supported := []string{"postgres", "vector", "mongodb", "redis"} + for _, rt := range supported { + if !backupSupportedResourceType(rt) { + t.Errorf("backupSupportedResourceType(%q) = false; want true (R2 ladder must cover it)", rt) + } + } + for _, rt := range []string{"webhook", "queue", "storage", "deploy", "", "nosql"} { + if backupSupportedResourceType(rt) { + t.Errorf("backupSupportedResourceType(%q) = true; want false", rt) + } + } +} + +// TestDumpForResourceType_Dispatch — the runner picks the right dump strategy +// per resource_type, returns nil + reason for unsupported types, and nil + +// reason when a strategy is unconfigured (misconfigured boot). +func TestDumpForResourceType_Dispatch(t *testing.T) { + pg := &fakePgDump{payload: []byte("pg")} + mongo := &fakeMongoDump{payload: []byte("mongo")} + redis := &fakeRedisDump{payload: []byte("redis")} + w := &CustomerBackupRunnerWorker{pgDump: pg, mongoDump: mongo, redisDump: redis} + + for _, rt := range []string{"postgres", "vector", "mongodb", "redis"} { + fn, reason := w.dumpForResourceType(rt) + if fn == nil { + t.Errorf("dumpForResourceType(%q) = nil (reason %q); want a dumper", rt, reason) + } + if reason != "" { + t.Errorf("dumpForResourceType(%q) reason = %q; want empty", rt, reason) + } + } + + // Unsupported type → nil + reason. + if fn, reason := w.dumpForResourceType("webhook"); fn != nil || reason == "" { + t.Errorf("dumpForResourceType(webhook) = (%v, %q); want (nil, non-empty)", fn != nil, reason) + } + + // Nil mongo strategy (misconfigured boot) → nil + reason, never panic. + wNoMongo := &CustomerBackupRunnerWorker{pgDump: pg} + if fn, reason := wNoMongo.dumpForResourceType("mongodb"); fn != nil || reason == "" { + t.Errorf("nil mongoDump: got (%v, %q); want (nil, non-empty)", fn != nil, reason) + } + if fn, reason := wNoMongo.dumpForResourceType("redis"); fn != nil || reason == "" { + t.Errorf("nil redisDump: got (%v, %q); want (nil, non-empty)", fn != nil, reason) + } +} + +// TestSplitRedisURL — host/port/password/tls extraction so redis-cli can run +// with the password out of argv (REDISCLI_AUTH). +func TestSplitRedisURL(t *testing.T) { + cases := []struct { + name string + in string + host string + port string + password string + tls bool + expectErr bool + }{ + {"full", "redis://:s3cr3t@redis.host:6380/0", "redis.host", "6380", "s3cr3t", false, false}, + {"default_port", "redis://:pw@h", "h", "6379", "pw", false, false}, + {"no_password", "redis://h:6379", "h", "6379", "", false, false}, + {"tls", "rediss://:pw@h:6380", "h", "6380", "pw", true, false}, + {"user_and_pw", "redis://user:pw@h:6379", "h", "6379", "pw", false, false}, + {"bad_scheme", "http://h:80", "", "", "", false, true}, + {"missing_host", "redis://", "", "", "", false, true}, + {"unparseable", "::::not a url", "", "", "", false, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + host, port, password, tls, err := splitRedisURL(c.in) + if c.expectErr { + if err == nil { + t.Fatalf("splitRedisURL(%q) err = nil; want error", c.in) + } + return + } + if err != nil { + t.Fatalf("splitRedisURL(%q) err = %v; want nil", c.in, err) + } + if host != c.host || port != c.port || password != c.password || tls != c.tls { + t.Errorf("splitRedisURL(%q) = (%q,%q,%q,%v); want (%q,%q,%q,%v)", + c.in, host, port, password, tls, c.host, c.port, c.password, c.tls) + } + }) + } +} + +// TestWriteMongoConfig — the mongodump config file is 0600 and carries the +// URI as a quoted YAML scalar so the password never reaches argv. +func TestWriteMongoConfig(t *testing.T) { + const uri = "mongodb://u:p@h:27017/db?authSource=admin" + path, cleanup, err := writeMongoConfig(uri) + if err != nil { + t.Fatalf("writeMongoConfig: %v", err) + } + defer cleanup() + + info, statErr := os.Stat(path) + if statErr != nil { + t.Fatalf("stat config: %v", statErr) + } + if runtime.GOOS != "windows" && info.Mode().Perm() != 0o600 { + t.Errorf("config perm = %o; want 0600 (URI carries the password)", info.Mode().Perm()) + } + body, readErr := os.ReadFile(path) + if readErr != nil { + t.Fatalf("read config: %v", readErr) + } + if !bytes.Contains(body, []byte("uri:")) { + t.Errorf("config missing uri: key; got %q", body) + } + if !bytes.Contains(body, []byte(uri)) { + t.Errorf("config missing the URI value; got %q", body) + } + + // cleanup removes the file. + cleanup() + if _, statErr := os.Stat(path); !os.IsNotExist(statErr) { + t.Errorf("config file still present after cleanup: %v", statErr) + } +} + +// installFakeBinary writes a shell-script stand-in for an external CLI +// (mongodump / mongorestore / redis-cli) into a TempDir, prepends it to PATH, +// and returns the dir. The script records argv to /argv.txt and the +// named env var's value to /, writes a tiny stdout payload, and +// exits 0. Mirrors installFakePgDump (customer_backup_runner_test.go) so the +// secret-hygiene branches in backup_dump.go can be exercised without the real +// binaries. +func installFakeBinary(t *testing.T, name, envVar, envFile string) (dir string) { + t.Helper() + if runtime.GOOS == "windows" { + t.Skip("fake CLI script is shell-based; worker runs on linux/darwin only") + } + dir = t.TempDir() + script := "#!/bin/sh\n" + + "printf '%s\\n' \"$@\" > \"" + dir + "/argv.txt\"\n" + if envVar != "" { + script += "printf '%s' \"${" + envVar + ":-}\" > \"" + dir + "/" + envFile + "\"\n" + } + script += "printf 'fakebody'\n" + + "exit 0\n" + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte(script), 0o755); err != nil { + t.Fatalf("write fake %s: %v", name, err) + } + t.Setenv("PATH", dir+string(os.PathListSeparator)+os.Getenv("PATH")) + return dir +} + +func readArgv(t *testing.T, dir string) []string { + t.Helper() + b, err := os.ReadFile(filepath.Join(dir, "argv.txt")) + if err != nil { + t.Fatalf("read argv.txt: %v", err) + } + s := string(bytes.TrimRight(b, "\n")) + if s == "" { + return nil + } + parts := bytes.Split([]byte(s), []byte("\n")) + out := make([]string, len(parts)) + for i, p := range parts { + out[i] = string(p) + } + return out +} + +// TestRealMongoDumpRunner_ConfigFileKeepsURIOutOfArgv — mongodump is invoked +// with --config and --archive; the URI (with password) must NOT appear +// in argv (it lives in the 0600 config file instead). +func TestRealMongoDumpRunner_ConfigFileKeepsURIOutOfArgv(t *testing.T) { + dir := installFakeBinary(t, "mongodump", "", "") + const secret = "mongo-secret-PW" + connURL := "mongodb://admin:" + secret + "@m.host:27017/app?authSource=admin" + + var out bytes.Buffer + if err := (realMongoDumpRunner{}).Run(context.Background(), connURL, &out); err != nil { + t.Fatalf("Run: %v", err) + } + if out.String() != "fakebody" { + t.Errorf("stdout = %q; want fakebody", out.String()) + } + argv := readArgv(t, dir) + + sawConfig, sawArchive := false, false + for _, a := range argv { + if a == "--config" { + sawConfig = true + } + if a == "--archive" { + sawArchive = true + } + if bytes.Contains([]byte(a), []byte(secret)) { + t.Errorf("argv leaks mongo password: %q (full argv: %q)", a, argv) + } + // The runner must NOT pass --gzip (the pipeline gzips; double-gzip + // would break the gunzip→mongorestore restore symmetry). + if a == "--gzip" { + t.Errorf("mongodump invoked with --gzip; the pipeline owns compression (double-gzip): %q", argv) + } + } + if !sawConfig { + t.Errorf("mongodump argv missing --config (URI must be out of argv): %q", argv) + } + if !sawArchive { + t.Errorf("mongodump argv missing --archive: %q", argv) + } +} + +// TestRealRedisDumpRunner_PasswordViaEnv — redis-cli is invoked with +// -h/-p/--rdb - and the password is passed via REDISCLI_AUTH env, NOT argv. +func TestRealRedisDumpRunner_PasswordViaEnv(t *testing.T) { + dir := installFakeBinary(t, "redis-cli", "REDISCLI_AUTH", "auth.txt") + const secret = "redis-secret-PW" + connURL := "redis://:" + secret + "@r.host:6380/0" + + var out bytes.Buffer + if err := (realRedisDumpRunner{}).Run(context.Background(), connURL, &out); err != nil { + t.Fatalf("Run: %v", err) + } + if out.String() != "fakebody" { + t.Errorf("stdout = %q; want fakebody", out.String()) + } + + argv := readArgv(t, dir) + auth := mustReadString(t, filepath.Join(dir, "auth.txt")) + if auth != secret { + t.Errorf("REDISCLI_AUTH = %q; want %q", auth, secret) + } + for _, a := range argv { + if bytes.Contains([]byte(a), []byte(secret)) { + t.Errorf("argv leaks redis password: %q (full argv: %q)", a, argv) + } + } + // Must carry the host/port + the `--rdb -` stream-to-stdout flag. + wantFlags := map[string]bool{"-h": false, "r.host": false, "-p": false, "6380": false, "--rdb": false, "-": false} + for _, a := range argv { + if _, ok := wantFlags[a]; ok { + wantFlags[a] = true + } + } + for flag, seen := range wantFlags { + if !seen { + t.Errorf("redis-cli argv missing %q; got %q", flag, argv) + } + } +} + +// TestRealMongoRestoreRunner_DropAndArchive — mongorestore must run with +// --archive + --drop (the rewind semantic) and keep the URI out of argv. +func TestRealMongoRestoreRunner_DropAndArchive(t *testing.T) { + dir := installFakeBinary(t, "mongorestore", "", "") + const secret = "mongo-restore-PW" + connURL := "mongodb://admin:" + secret + "@m.host:27017/app" + + if err := (realMongoRestoreRunner{}).Run(context.Background(), connURL, bytes.NewReader([]byte("archive-bytes"))); err != nil { + t.Fatalf("Run: %v", err) + } + argv := readArgv(t, dir) + sawDrop, sawArchive, sawConfig := false, false, false + for _, a := range argv { + switch a { + case "--drop": + sawDrop = true + case "--archive": + sawArchive = true + case "--config": + sawConfig = true + } + if bytes.Contains([]byte(a), []byte(secret)) { + t.Errorf("mongorestore argv leaks password: %q (full argv: %q)", a, argv) + } + } + if !sawDrop || !sawArchive || !sawConfig { + t.Errorf("mongorestore argv = %q; want --config + --archive + --drop", argv) + } +} diff --git a/internal/jobs/customer_backup_mongo_redis_test.go b/internal/jobs/customer_backup_mongo_redis_test.go new file mode 100644 index 0000000..1ca561b --- /dev/null +++ b/internal/jobs/customer_backup_mongo_redis_test.go @@ -0,0 +1,189 @@ +package jobs + +import ( + "context" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/testutil" + + "instant.dev/worker/internal/metrics" +) + +// expectBackupRetentionTail mocks the trailing per-tick retention sweep: one +// empty SELECT per tier name (five tier names in the fallback list). Mirrors +// the tail every other runner test mocks. The keep-last-N sweep that follows +// is deliberately NOT mocked — sqlmock returns an error for the unmocked +// query, which runKeepLastNSweep swallows (fail-soft), exactly as the +// pre-existing TestRunner_HappyPath relies on. +func expectBackupRetentionTail(mock sqlmock.Sqlmock) { + for i := 0; i < 5; i++ { + mock.ExpectQuery(`SELECT id::text, s3_key\s+FROM resource_backups`). + WillReturnRows(sqlmock.NewRows([]string{"id", "s3_key"})) + } +} + +// runBackupHappyPath drives one resource of the given type through the runner +// and asserts (a) the right dumper was hit with the decrypted conn and (b) the +// gzip+upload landed one object. Shared by the mongo + redis tests so the +// pipeline contract is asserted identically across types. +func runBackupHappyPath(t *testing.T, resourceType string, dumper interface { + connSeen() string +}) { + t.Helper() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + backupID := "11111111-1111-1111-1111-111111111111" + resID := "22222222-2222-2222-2222-222222222222" + teamID := uuid.MustParse("33333333-3333-3333-3333-333333333333") + token := "tok-" + resourceType + plainConn := "scheme://u:p@host/db" + encConn := encryptForTest(t, plainConn) + + mock.ExpectExec(`UPDATE resource_backups\s+SET status = 'pending'`). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(`SELECT b.id::text, b.resource_id::text, b.tier_at_backup`). + WithArgs(backupBatchSize). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "resource_id", "tier_at_backup", "backup_kind", + "token", "connection_url", "resource_type", "team_id", + }).AddRow(backupID, resID, "pro", "scheduled", token, encConn, resourceType, teamID)) + mock.ExpectQuery(`UPDATE resource_backups\s+SET status = 'running'`). + WithArgs(backupID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(backupID)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`UPDATE resource_backups\s+SET status = 'ok'`). + WithArgs(backupID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + expectBackupRetentionTail(mock) + + store := newFakeBackupStore() + w := &CustomerBackupRunnerWorker{ + db: db, + store: store, + pgDump: &fakePgDump{err: errAssertNotPg}, + bucket: "instant-shared", + prefix: "backups", + aesKey: testAESKeyHex, + now: time.Now, + timeout: time.Minute, + batchN: backupBatchSize, + } + switch resourceType { + case resourceTypeMongoDB: + w.mongoDump = dumper.(*fakeMongoDump) + case resourceTypeRedis: + w.redisDump = dumper.(*fakeRedisDump) + } + + before := testutil.ToFloat64(metrics.CustomerBackupByTypeTotal.WithLabelValues(resourceType, "ok")) + + if err := w.Work(context.Background(), fakeRunnerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } + if got := dumper.connSeen(); got != plainConn { + t.Errorf("%s dumper.connSeen() = %q; want decrypted %q", resourceType, got, plainConn) + } + wantKey := "backups/" + token + "/" + backupID + ".dump.gz" + if len(store.uploads) != 1 || store.uploads[0].key != wantKey { + t.Fatalf("%s uploads = %+v; want one object keyed %q", resourceType, store.uploads, wantKey) + } + after := testutil.ToFloat64(metrics.CustomerBackupByTypeTotal.WithLabelValues(resourceType, "ok")) + if after != before+1 { + t.Errorf("CustomerBackupByTypeTotal{%s,ok} = %v; want %v (+1)", resourceType, after, before+1) + } +} + +// errAssertNotPg is wired into the pg dumper for the mongo/redis happy-path +// tests: if the runner mis-dispatches a mongodb/redis row to pg_dump, the +// failure surfaces loudly rather than silently producing a pg backup. +var errAssertNotPg = errPgDispatchGuard{} + +type errPgDispatchGuard struct{} + +func (errPgDispatchGuard) Error() string { + return "pg dumper was invoked for a non-postgres resource_type — dispatch regressed" +} + +// connSeen adapters so runBackupHappyPath can read back the conn the fake +// dumper recorded without a type switch at the call site. +func (f *fakeMongoDump) connSeen() string { return f.gotConn } +func (f *fakeRedisDump) connSeen() string { return f.gotConn } + +// TestRunner_MongoHappyPath — a mongodb resource is dumped via the mongo +// strategy (NOT pg_dump), gzipped, uploaded, finalized, and the per-type +// metric is incremented. This is the R2 core proof for Mongo. +func TestRunner_MongoHappyPath(t *testing.T) { + runBackupHappyPath(t, resourceTypeMongoDB, &fakeMongoDump{payload: []byte("BSONARCHIVE")}) +} + +// TestRunner_RedisHappyPath — a redis resource is dumped via the redis +// strategy (NOT pg_dump), gzipped, uploaded, finalized, per-type metric +// incremented. R2 core proof for Redis. +func TestRunner_RedisHappyPath(t *testing.T) { + runBackupHappyPath(t, resourceTypeRedis, &fakeRedisDump{payload: []byte("REDIS0011RDBBODY")}) +} + +// TestRunner_UnsupportedType_MarksFailed — a manual backup against a type the +// ladder can't dump (e.g. a webhook resource) marks the row failed with a +// 'config' reason rather than panicking or hanging. Defence-in-depth: the +// scheduler SQL never enqueues these, but a direct api call could. +func TestRunner_UnsupportedType_MarksFailed(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + backupID := "11111111-1111-1111-1111-111111111111" + resID := "22222222-2222-2222-2222-222222222222" + teamID := uuid.MustParse("33333333-3333-3333-3333-333333333333") + encConn := encryptForTest(t, "scheme://u:p@host/db") + + mock.ExpectExec(`UPDATE resource_backups\s+SET status = 'pending'`). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(`SELECT b.id::text`). + WithArgs(backupBatchSize). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "resource_id", "tier_at_backup", "backup_kind", + "token", "connection_url", "resource_type", "team_id", + }).AddRow(backupID, resID, "pro", "manual", "tok", encConn, "webhook", teamID)) + mock.ExpectQuery(`UPDATE resource_backups\s+SET status = 'running'`). + WithArgs(backupID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(backupID)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + // markFailed path: status=failed + audit row. + mock.ExpectExec(`UPDATE resource_backups\s+SET status = 'failed'`). + WithArgs(backupID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + expectBackupRetentionTail(mock) + + w := &CustomerBackupRunnerWorker{ + db: db, + store: newFakeBackupStore(), + pgDump: &fakePgDump{payload: []byte("x")}, + bucket: "instant-shared", + prefix: "backups", + aesKey: testAESKeyHex, + now: time.Now, + timeout: time.Minute, + batchN: backupBatchSize, + } + if err := w.Work(context.Background(), fakeRunnerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} diff --git a/internal/jobs/customer_backup_runner.go b/internal/jobs/customer_backup_runner.go index e5364c1..c6a5ce6 100644 --- a/internal/jobs/customer_backup_runner.go +++ b/internal/jobs/customer_backup_runner.go @@ -145,16 +145,24 @@ func (realPgDumpRunner) Run(ctx context.Context, connURL string, w io.Writer) er // in production the constructor below requires both. type CustomerBackupRunnerWorker struct { river.WorkerDefaults[CustomerBackupRunnerArgs] - db *sql.DB - store BackupObjectStore - pgDump pgDumpRunner - bucket string - prefix string - aesKey string // hex, decoded at use site via crypto.ParseAESKey - plans BackupPlanRegistry - now func() time.Time - timeout time.Duration - batchN int + db *sql.DB + store BackupObjectStore + // Per-resource_type dump strategies. pgDump serves postgres/vector; + // mongoDump serves mongodb; redisDump serves redis. All three write a + // RAW (uncompressed) archive into the runner's gzip pipeline — see + // backup_dump.go's gzip contract. A nil strategy makes that + // resource_type a fail-open skip (markFailed reason="config"), never a + // panic. + pgDump pgDumpRunner + mongoDump mongoDumpRunner + redisDump redisDumpRunner + bucket string + prefix string + aesKey string // hex, decoded at use site via crypto.ParseAESKey + plans BackupPlanRegistry + now func() time.Time + timeout time.Duration + batchN int // apiBase / apiCli / jwtSecret — used by the FIX-H #65/#Q47 refund // path. When apiBase or jwtSecret is empty the refund call is a @@ -172,16 +180,18 @@ type CustomerBackupRunnerWorker struct { // logs a WARN; the sweep still runs but with a coarse policy. func NewCustomerBackupRunner(db *sql.DB, store BackupObjectStore, bucket, prefix, aesKey string, plans BackupPlanRegistry) *CustomerBackupRunnerWorker { return &CustomerBackupRunnerWorker{ - db: db, - store: store, - pgDump: realPgDumpRunner{}, - bucket: bucket, - prefix: prefix, - aesKey: aesKey, - plans: plans, - now: time.Now, - timeout: backupPerRunTimeout, - batchN: backupBatchSize, + db: db, + store: store, + pgDump: realPgDumpRunner{}, + mongoDump: realMongoDumpRunner{}, + redisDump: realRedisDumpRunner{}, + bucket: bucket, + prefix: prefix, + aesKey: aesKey, + plans: plans, + now: time.Now, + timeout: backupPerRunTimeout, + batchN: backupBatchSize, } } @@ -428,14 +438,29 @@ func (w *CustomerBackupRunnerWorker) processBackup(parentCtx context.Context, p return false } - // Step 3 — stream pg_dump → gzip → (sha256 + S3) via io.Pipe. + // Step 2.5 — resolve the per-resource_type dump strategy. R2 + // (2026-06-10): the runner backs up postgres/vector (pg_dump), mongodb + // (mongodump), and redis (redis-cli --rdb) through ONE pipeline. An + // unsupported type (or a nil strategy in a misconfigured boot) is a + // fail-open 'config' failure — never a panic. The scheduler's SQL filter + // only enqueues supported types, so this branch is defence-in-depth for + // a manual API backup against a type we don't dump. + dumpRun, unsupportedReason := w.dumpForResourceType(p.resourceType) + if dumpRun == nil { + w.markFailed(ctx, p.backupID, "config", unsupportedReason, start, p) + return false + } + + // Step 3 — stream → gzip → (sha256 + S3) via io.Pipe. // // FIX-H #59 — the gzip output is teed into a SHA-256 hasher so the // final hex digest is available at finalize time. We hash the - // COMPRESSED bytes (not the raw pg_dump output) because the - // compressed object is what lives in S3 and what the restore - // handler / runner will re-read for verification. Hashing happens - // inline on the writer side — no second pass over the bytes. + // COMPRESSED bytes (not the raw dump output) because the compressed + // object is what lives in S3 and what the restore handler / runner will + // re-read for verification. Hashing happens inline on the writer side — + // no second pass over the bytes. Every dump strategy writes RAW + // (uncompressed) bytes into the gzip writer (backup_dump.go gzip + // contract), so the pipeline is identical across resource types. objectKey := backupObjectKey(w.prefix, p.token, p.backupID) pr, pw := io.Pipe() hasher := sha256.New() @@ -453,15 +478,15 @@ func (w *CustomerBackupRunnerWorker) processBackup(parentCtx context.Context, p // the Upload reader sees EOF instead of blocking forever. defer func() { if r := recover(); r != nil { - panicErr := fmt.Errorf("pg_dump goroutine panicked: %v", r) + panicErr := fmt.Errorf("backup dump goroutine panicked: %v", r) _ = pw.CloseWithError(panicErr) dumpDone <- panicErr - LogRecoveredPanic("customer_backup_runner.pg_dump_pipe", r) + LogRecoveredPanic("customer_backup_runner.dump_pipe", r) } }() mw := io.MultiWriter(hasher, pw) gz := gzip.NewWriter(mw) - runErr := w.pgDump.Run(ctx, plainConn, gz) + runErr := dumpRun(ctx, plainConn, gz) // Close gzip first to flush the trailer, then the pipe so the // Upload side sees EOF (not just the partial gzip stream). If // pg_dump errored, propagate the close-error too. @@ -483,7 +508,7 @@ func (w *CustomerBackupRunnerWorker) processBackup(parentCtx context.Context, p // actionable: "pg_dump: connection refused" vs "pipe: io: read/write // on closed pipe"). if dumpErr != nil { - w.markFailed(ctx, p.backupID, backupFailReason(dumpErr), fmt.Sprintf("pg_dump failed: %v", dumpErr), start, p) + w.markFailed(ctx, p.backupID, backupFailReason(dumpErr), fmt.Sprintf("backup dump failed: %v", dumpErr), start, p) // Best-effort cleanup of a half-written object so we don't pay // for orphan bytes; failure to delete is logged but not fatal. if delErr := w.store.DeleteObject(parentCtx, w.bucket, objectKey); delErr != nil { @@ -545,6 +570,10 @@ func (w *CustomerBackupRunnerWorker) processBackup(parentCtx context.Context, p } metrics.CustomerBackupSucceededTotal.Inc() + // R2 (2026-06-10) per-resource_type breakdown — answers "is Mongo + // backing up but Redis silently failing?" which the aggregate counter + // can't. Labels: resource_type + result. + metrics.CustomerBackupByTypeTotal.WithLabelValues(p.resourceType, "ok").Inc() slog.Info("jobs.customer_backup_runner.succeeded", "backup_id", p.backupID, "resource_id", p.resourceID, @@ -627,6 +656,10 @@ func (w *CustomerBackupRunnerWorker) markFailed( // (reason="auth", PAGE) is distinguishable from a transient dump/upload // failure (retried next run). NR alert: customer-backup-failed.json. metrics.CustomerBackupFailedTotal.WithLabelValues(reason).Inc() + // R2 (2026-06-10) per-resource_type breakdown of the failure. Pairs with + // the "ok" counter on the success path so the dashboard can compute a + // per-type success ratio (e.g. Redis failing while Mongo is healthy). + metrics.CustomerBackupByTypeTotal.WithLabelValues(p.resourceType, "failed").Inc() // Two summaries: a SANITIZED, user-safe one persisted to the DB + audit // (it surfaces on the customer's failure email and the backup-health diff --git a/internal/jobs/customer_backup_scheduler.go b/internal/jobs/customer_backup_scheduler.go index 923e006..bb15019 100644 --- a/internal/jobs/customer_backup_scheduler.go +++ b/internal/jobs/customer_backup_scheduler.go @@ -1,6 +1,6 @@ // customer_backup_scheduler.go — periodic sweep that INSERTs a `pending` -// resource_backups row for every tier-eligible postgres/vector resource on -// the platform. +// resource_backups row for every tier-eligible postgres/vector/mongodb/redis +// resource on the platform (R2, 2026-06-10: grew from postgres/vector only). // // Why a scheduler at all (vs. cron triggering the api directly): the worker // already owns the DB connection, the audit_log writer, and the periodic-job @@ -68,7 +68,8 @@ func (CustomerBackupSchedulerArgs) Kind() string { return "customer_backup_sched // CustomerBackupSchedulerWorker scans the resources table once an hour and // inserts a resource_backups row in the 'pending' state for every active -// postgres/vector resource whose tier is due for a backup this hour. +// postgres/vector/mongodb/redis resource whose tier is due for a backup this +// hour. type CustomerBackupSchedulerWorker struct { river.WorkerDefaults[CustomerBackupSchedulerArgs] db *sql.DB @@ -207,11 +208,21 @@ func (w *CustomerBackupSchedulerWorker) Work(ctx context.Context, job *river.Job // dropped hobby_plus + every _yearly variant when first written; the // registry-driven path removes that single-site-list failure mode // entirely (root CLAUDE.md rule 18). + // + // R2 (2026-06-10) — the resource_type filter grew from ('postgres', + // 'vector') to also include ('mongodb', 'redis'). The product sells + // "backups + 1-click restore" for ALL paid resources, but pre-R2 the + // ladder backed up postgres/vector ONLY — Mongo/Redis had ZERO automated + // backup (worker #103 note + GAP-AUDIT-2026-06-10). The runner now has a + // mongodump + redis-cli dump strategy (backup_dump.go), so all four types + // are tier-gated/cadence-driven identically. The list is anchored on + // backupSupportedResourceType() so SQL and the runner's dispatch can't + // drift (root rule 16/18 — single source for "what's backed up"). rows, err := w.db.QueryContext(ctx, ` SELECT r.id::text, r.tier, r.team_id FROM resources r WHERE r.status = 'active' - AND r.resource_type IN ('postgres', 'vector') + AND r.resource_type IN ('postgres', 'vector', 'mongodb', 'redis') AND r.tier NOT IN ('anonymous', 'free') `) if err != nil { diff --git a/internal/jobs/customer_backup_scheduler_mongo_redis_test.go b/internal/jobs/customer_backup_scheduler_mongo_redis_test.go new file mode 100644 index 0000000..49fbc18 --- /dev/null +++ b/internal/jobs/customer_backup_scheduler_mongo_redis_test.go @@ -0,0 +1,144 @@ +package jobs + +import ( + "context" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" +) + +// TestScheduler_SQLFilterIncludesAllSupportedTypes pins R2: the candidate +// SELECT must enqueue postgres, vector, mongodb AND redis (was postgres/vector +// only). sqlmock's regexp matcher asserts the issued statement carries every +// supported resource_type — a regression that drops mongodb/redis from the +// filter fails this expectation loudly (root rule 18: registry/SQL drift +// guard). The fixture drives one pro mongodb row to also prove the row makes +// it through the cadence gate to an INSERT. +func TestScheduler_SQLFilterIncludesAllSupportedTypes(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + // The SELECT must name all four supported types. A drift back to + // postgres/vector-only fails this regex match. + mock.ExpectQuery(`resource_type IN \('postgres', 'vector', 'mongodb', 'redis'\)`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "pro", teamID)) + mock.ExpectExec(`INSERT INTO resource_backups`). + WithArgs(uuid.MustParse(resID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 6, 10, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("SQL filter does not include all four supported resource types: %v", err) + } +} + +// TestScheduler_EnqueuesMongoAndRedisForProTier — R2 core ask: a pro mongodb +// resource AND a pro redis resource each yield an INSERT (hourly cadence, +// same as pg). Drives both rows in one candidate set so the per-row cadence +// gate is exercised for the new types. The scheduler is type-agnostic post-R2 +// (cadence is tier-driven), so this proves the SQL now surfaces them and the +// gate lets them through. +func TestScheduler_EnqueuesMongoAndRedisForProTier(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + mongoID := "11111111-1111-2222-3333-444444444444" + redisID := "22222222-1111-2222-3333-444444444444" + + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(mongoID, "pro", teamID). + AddRow(redisID, "pro", teamID)) + // Both rows are pro (hourly) → both INSERT. + mock.ExpectExec(`INSERT INTO resource_backups`). + WithArgs(uuid.MustParse(mongoID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`INSERT INTO resource_backups`). + WithArgs(uuid.MustParse(redisID), "pro"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 6, 10, 9, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("mongo/redis pro rows not both enqueued hourly: %v", err) + } +} + +// TestScheduler_FreeTierMongoNeverEnqueued — R2 hard requirement preserved: +// a free-tier mongodb/redis resource is NEVER enqueued. The SQL excludes +// anonymous/free up front, so even with mongodb/redis now in the type filter +// the candidate set is empty and no INSERT is issued. +func TestScheduler_FreeTierMongoNeverEnqueued(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + // SQL filters anonymous/free → empty result even though the row is a + // free mongodb resource. + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"})) + // No INSERT expected. + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 6, 10, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestScheduler_FreeMongoGate_SkipsEvenIfRowLeaksThrough — defence-in-depth: +// even if a free mongodb row leaked past the SQL filter, the per-row registry +// cadence gate (cadenceForTier → cadenceNever) vetoes it. No INSERT. +func TestScheduler_FreeMongoGate_SkipsEvenIfRowLeaksThrough(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + teamID := uuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + resID := "fffffff0-1111-2222-3333-444444444444" + + mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`). + WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}). + AddRow(resID, "free", teamID)) + // No INSERT — registry cadence gate must veto the free row regardless of type. + + w := NewCustomerBackupSchedulerWorker(db, schedulerPlans()) + w.now = func() time.Time { return time.Date(2026, 6, 10, 14, 0, 0, 0, time.UTC) } + + if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("free mongo row was not skipped by the registry gate: %v", err) + } +} diff --git a/internal/jobs/customer_restore_mongo_redis_test.go b/internal/jobs/customer_restore_mongo_redis_test.go new file mode 100644 index 0000000..47f0ab2 --- /dev/null +++ b/internal/jobs/customer_restore_mongo_redis_test.go @@ -0,0 +1,194 @@ +package jobs + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" +) + +// fakeMongoRestore mirrors fakePgRestore: records the conn + gunzipped body +// so the test can assert the runner dispatched a mongodb row to mongorestore +// (not pg_restore) and that the gunzip step round-trips. +type fakeMongoRestore struct { + err error + gotConn string + gotBody []byte +} + +func (f *fakeMongoRestore) Run(_ context.Context, connURL string, r io.Reader) error { + f.gotConn = connURL + if f.err != nil { + return f.err + } + data, err := io.ReadAll(r) + if err != nil { + return err + } + f.gotBody = data + return nil +} + +// TestRestoreForResourceType_Dispatch — the restore runner picks pg_restore +// for postgres/vector, mongorestore for mongodb, and returns nil + an +// explicit reason for redis (not yet supported) and unknown types. +func TestRestoreForResourceType_Dispatch(t *testing.T) { + w := &CustomerRestoreRunnerWorker{ + pgRestore: &fakePgRestore{}, + mongoRestore: &fakeMongoRestore{}, + } + for _, rt := range []string{"postgres", "vector", "mongodb"} { + fn, reason := w.restoreForResourceType(rt) + if fn == nil { + t.Errorf("restoreForResourceType(%q) = nil (reason %q); want a restorer", rt, reason) + } + if reason != "" { + t.Errorf("restoreForResourceType(%q) reason = %q; want empty", rt, reason) + } + } + // Redis restore is the tracked R2 follow-up — nil + a customer-readable reason. + if fn, reason := w.restoreForResourceType("redis"); fn != nil || reason == "" { + t.Errorf("restoreForResourceType(redis) = (%v, %q); want (nil, non-empty follow-up reason)", fn != nil, reason) + } + // Unknown type → nil + reason. + if fn, reason := w.restoreForResourceType("webhook"); fn != nil || reason == "" { + t.Errorf("restoreForResourceType(webhook) = (%v, %q); want (nil, non-empty)", fn != nil, reason) + } + // Nil mongo restorer (misconfigured boot) → nil + reason, never panic. + wNoMongo := &CustomerRestoreRunnerWorker{pgRestore: &fakePgRestore{}} + if fn, reason := wNoMongo.restoreForResourceType("mongodb"); fn != nil || reason == "" { + t.Errorf("nil mongoRestore: got (%v, %q); want (nil, non-empty)", fn != nil, reason) + } +} + +// TestRestoreRunner_MongoHappyPath — a mongodb restore row downloads the +// gzipped archive, verifies sha256, gunzips, and feeds mongorestore (NOT +// pg_restore). R2 proof that "1-click restore" is real for Mongo. +func TestRestoreRunner_MongoHappyPath(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + restoreID := "rrrrrrr0-1111-2222-3333-444444444444" + resID := "22222222-2222-2222-2222-222222222222" + backupID := "11111111-1111-1111-1111-111111111111" + teamID := uuid.MustParse("33333333-3333-3333-3333-333333333333") + s3Key := "backups/tok-mongo/" + backupID + ".dump.gz" + plainConn := "mongodb://u:p@host:27017/db" + encConn := encryptForTest(t, plainConn) + + store := newFakeBackupStore() + payload := []byte("BSON-ARCHIVE-PAYLOAD") + gzBytes := gzipFor(t, payload) + store.objects["instant-shared/"+s3Key] = gzBytes + storedSHA := sha256Hex(gzBytes) + + mock.ExpectQuery(`SELECT rr.id::text`). + WithArgs(restoreBatchSize). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "resource_id", "backup_id", "s3_key", "sha256", + "connection_url", "resource_type", "token", "team_id", + }).AddRow(restoreID, resID, backupID, s3Key, storedSHA, encConn, "mongodb", "tok-mongo", teamID)) + mock.ExpectQuery(`UPDATE resource_restores\s+SET status = 'running'`). + WithArgs(restoreID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(restoreID)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`UPDATE resource_restores\s+SET status = 'ok'`). + WithArgs(restoreID). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + + mr := &fakeMongoRestore{} + // A pg restorer that errors if (incorrectly) invoked for a mongo row. + w := &CustomerRestoreRunnerWorker{ + db: db, + store: store, + pgRestore: &fakePgRestore{err: errPgDispatchGuard{}}, + mongoRestore: mr, + bucket: "instant-shared", + aesKey: testAESKeyHex, + now: time.Now, + timeout: time.Minute, + batchN: restoreBatchSize, + } + + if err := w.Work(context.Background(), fakeRestoreJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } + if mr.gotConn != plainConn { + t.Errorf("mongorestore conn = %q; want decrypted %q", mr.gotConn, plainConn) + } + if !bytes.Equal(mr.gotBody, payload) { + t.Errorf("mongorestore body = %q; want %q (gunzip step broken)", mr.gotBody, payload) + } +} + +// TestRestoreRunner_RedisUnsupported_MarksFailed — a redis restore row is +// marked failed with the explicit "not yet supported" reason BEFORE any S3 +// download (the dispatch guard runs early). Confirms the R2 follow-up posture: +// redis backups exist + are downloadable, but in-place restore isn't wired. +func TestRestoreRunner_RedisUnsupported_MarksFailed(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + restoreID := "rrrrrrr0-1111-2222-3333-444444444444" + resID := "22222222-2222-2222-2222-222222222222" + backupID := "11111111-1111-1111-1111-111111111111" + teamID := uuid.MustParse("33333333-3333-3333-3333-333333333333") + s3Key := "backups/tok-redis/" + backupID + ".dump.gz" + encConn := encryptForTest(t, "redis://:pw@host:6379/0") + + mock.ExpectQuery(`SELECT rr.id::text`). + WithArgs(restoreBatchSize). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "resource_id", "backup_id", "s3_key", "sha256", + "connection_url", "resource_type", "token", "team_id", + }).AddRow(restoreID, resID, backupID, s3Key, "", encConn, "redis", "tok-redis", teamID)) + mock.ExpectQuery(`UPDATE resource_restores\s+SET status = 'running'`). + WithArgs(restoreID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(restoreID)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + // markRestoreFailed path — status=failed + audit row. NO download/finalize. + mock.ExpectExec(`UPDATE resource_restores\s+SET status = 'failed'`). + WithArgs(restoreID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + + store := newFakeBackupStore() + store.objects["instant-shared/"+s3Key] = gzipFor(t, []byte("rdb")) + w := &CustomerRestoreRunnerWorker{ + db: db, + store: store, + pgRestore: &fakePgRestore{}, + mongoRestore: &fakeMongoRestore{}, + bucket: "instant-shared", + aesKey: testAESKeyHex, + now: time.Now, + timeout: time.Minute, + batchN: restoreBatchSize, + } + + if err := w.Work(context.Background(), fakeRestoreJob()); err != nil { + t.Fatalf("Work: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("redis restore was not marked failed with the unsupported reason: %v", err) + } + // The S3 object must NOT have been downloaded/deleted — dispatch guard + // runs before download. + if len(store.deletes) != 0 { + t.Errorf("redis restore touched S3 (%d deletes); guard should run before download", len(store.deletes)) + } +} diff --git a/internal/jobs/customer_restore_runner.go b/internal/jobs/customer_restore_runner.go index 031c84c..60fad7d 100644 --- a/internal/jobs/customer_restore_runner.go +++ b/internal/jobs/customer_restore_runner.go @@ -1,7 +1,14 @@ // customer_restore_runner.go — every 30s, claim up to 5 pending rows from -// resource_restores and run `pg_restore --clean --if-exists --no-owner --no-acl` -// streaming the gzip'd dump from S3 back into the SAME resource the backup -// came from. +// resource_restores and run the per-resource_type restore tool +// (postgres/vector → `pg_restore --clean --if-exists --no-owner --no-acl`; +// mongodb → `mongorestore --archive --drop`), streaming the gzip'd dump from +// S3 back into the SAME resource the backup came from. +// +// R2 (2026-06-10): mongodb restore added alongside the mongodb/redis backup +// support. Redis RESTORE is NOT yet wired — an RDB restore-in-place needs +// pod-level access the worker lacks; Redis backups are still taken + sha- +// verified + downloadable, and Redis restore is the tracked R2 follow-up +// (see restoreForResourceType). // // Why restore-into-same-resource only: backup objects in S3 are immutable; // the schema/data they encode is keyed to the resource_id at backup time. A @@ -91,28 +98,100 @@ func (realPgRestoreRunner) Run(ctx context.Context, connURL string, r io.Reader) return nil } +// mongoRestoreRunner mirrors pgRestoreRunner for the mongodb branch. Run +// reads the `mongodump --archive` BSON archive from r (already gunzipped by +// the caller) and applies it with `mongorestore --archive --drop` so the +// "rewind to this backup" semantics match pg_restore's --clean --if-exists. +type mongoRestoreRunner interface { + Run(ctx context.Context, connURL string, r io.Reader) error +} + +type realMongoRestoreRunner struct{} + +func (realMongoRestoreRunner) Run(ctx context.Context, connURL string, r io.Reader) error { + // --drop drops each collection before restoring it (the mongo analogue + // of pg_restore --clean --if-exists) so the restore is a true rewind, not + // a merge. Secret hygiene mirrors realMongoDumpRunner: pass the URI via a + // 0600 config file so the password stays out of argv; fail-open to --uri + // in argv on a temp-file error. + cfgPath, cleanup, cfgErr := writeMongoConfig(connURL) + var cmd *exec.Cmd + if cfgErr == nil { + defer cleanup() + cmd = exec.CommandContext(ctx, "mongorestore", + "--config", cfgPath, + "--archive", + "--drop", + ) + } else { + cmd = exec.CommandContext(ctx, "mongorestore", + "--uri", connURL, + "--archive", + "--drop", + ) + } + cmd.Stdin = r + var stderrBuf limitedBuffer + cmd.Stderr = &stderrBuf + if err := cmd.Run(); err != nil { + return fmt.Errorf("mongorestore: %w (stderr: %s)", err, stderrBuf.String()) + } + return nil +} + type CustomerRestoreRunnerWorker struct { river.WorkerDefaults[CustomerRestoreRunnerArgs] - db *sql.DB - store BackupObjectStore - pgRestore pgRestoreRunner - bucket string - aesKey string - now func() time.Time - timeout time.Duration - batchN int + db *sql.DB + store BackupObjectStore + // Per-resource_type restore strategies. pgRestore serves postgres/vector; + // mongoRestore serves mongodb. Redis restore is NOT yet wired (see + // restoreForResourceType) — RDB restore-in-place needs pod-level access + // the worker doesn't have; tracked as the R2 follow-up. + pgRestore pgRestoreRunner + mongoRestore mongoRestoreRunner + bucket string + aesKey string + now func() time.Time + timeout time.Duration + batchN int } func NewCustomerRestoreRunner(db *sql.DB, store BackupObjectStore, bucket, aesKey string) *CustomerRestoreRunnerWorker { return &CustomerRestoreRunnerWorker{ - db: db, - store: store, - pgRestore: realPgRestoreRunner{}, - bucket: bucket, - aesKey: aesKey, - now: time.Now, - timeout: restorePerRunTimeout, - batchN: restoreBatchSize, + db: db, + store: store, + pgRestore: realPgRestoreRunner{}, + mongoRestore: realMongoRestoreRunner{}, + bucket: bucket, + aesKey: aesKey, + now: time.Now, + timeout: restorePerRunTimeout, + batchN: restoreBatchSize, + } +} + +// restoreForResourceType returns the restore Run func for the given +// resource_type, or nil + a reason when restore isn't supported for that +// type. postgres/vector → pg_restore; mongodb → mongorestore. redis returns +// nil with an explicit "not yet supported" reason: an RDB restore-in-place +// requires replacing dump.rdb on the redis pod + a restart (or a per-key +// RESTORE pass that parses the RDB), neither of which the worker can drive +// from outside the pod. The Redis BACKUP ships in R2; Redis RESTORE is the +// tracked follow-up. Mirrors dumpForResourceType on the backup runner so the +// dispatch logic is symmetric and unit-testable. +func (w *CustomerRestoreRunnerWorker) restoreForResourceType(resourceType string) (func(ctx context.Context, connURL string, r io.Reader) error, string) { + switch resourceType { + case resourceTypePostgres, resourceTypeVector: + return w.pgRestore.Run, "" + case resourceTypeMongoDB: + if w.mongoRestore == nil { + return nil, "mongo restore runner not configured" + } + return w.mongoRestore.Run, "" + case resourceTypeRedis: + return nil, "redis restore not yet supported (RDB restore-in-place requires pod-level access; tracked as R2 follow-up — backups are still taken and downloadable)" + default: + return nil, fmt.Sprintf("unsupported resource_type %q for restore", resourceType) } } @@ -305,6 +384,18 @@ func (w *CustomerRestoreRunnerWorker) processRestore(parentCtx context.Context, }) } + // Resolve the per-resource_type restore strategy BEFORE downloading the + // (potentially multi-GB) object — no point streaming a Redis RDB from S3 + // only to discover restore isn't wired for it. postgres/vector → + // pg_restore; mongodb → mongorestore; redis → not yet supported (R2 + // follow-up). An unsupported type marks the row failed with an explicit, + // customer-readable reason rather than silently hanging. + restoreRun, unsupportedReason := w.restoreForResourceType(p.resourceType) + if restoreRun == nil { + w.markRestoreFailed(ctx, p.restoreID, unsupportedReason, start, p) + return false + } + // Validate backup is still present (retention sweep may have nulled // s3_key out from under the api's check, in the race between the // /restore POST and the runner picking it up). @@ -414,8 +505,8 @@ func (w *CustomerRestoreRunnerWorker) processRestore(parentCtx context.Context, } defer func() { _ = gzReader.Close() }() - if runErr := w.pgRestore.Run(ctx, plainConn, gzReader); runErr != nil { - w.markRestoreFailed(ctx, p.restoreID, fmt.Sprintf("pg_restore failed: %v", runErr), start, p) + if runErr := restoreRun(ctx, plainConn, gzReader); runErr != nil { + w.markRestoreFailed(ctx, p.restoreID, fmt.Sprintf("restore failed: %v", runErr), start, p) return false } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 2397fb6..e03c22a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -93,6 +93,25 @@ var ( Help: "Customer backup runs that completed and were stored in S3.", }) + // CustomerBackupByTypeTotal is the per-resource_type breakdown of customer + // backup outcomes, added when the backup ladder grew from postgres/vector + // to also cover mongodb + redis (R2, 2026-06-10). Labels: + // resource_type — postgres | vector | mongodb | redis + // result — ok | failed + // The pre-existing aggregate CustomerBackupSucceededTotal / + // CustomerBackupFailedTotal counters are retained (the old dashboard tiles + // + customer-backup-failed.json alert still read them); this Vec is the + // NEW surface that answers "is Mongo backing up but Redis silently + // failing?" — a question the aggregate counters can't. Rule 25: the + // matching alert + dashboard tile + METRICS-CATALOG row ship in the same + // PR (Prom rule: per-type success-ratio < threshold; NR tile: stacked by + // resource_type). Lazy *Vec — primed for all (type,result) pairs in + // metrics_test.go so /metrics exposes the series from process start. + CustomerBackupByTypeTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "instant_customer_backup_by_type_total", + Help: "Customer backup runs by resource_type (postgres|vector|mongodb|redis) and result (ok|failed).", + }, []string{"resource_type", "result"}) + // ResourceDegradedGauge is sampled at the end of each heartbeat run. // Labelled by resource_type so the dashboard can break down "how many // of my Postgres instances are unreachable right now". diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 60803bc..3f47e55 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -137,6 +137,15 @@ func TestAllMetrics_AreRegistered(t *testing.T) { OrphanDBSweepCandidatesTotal.WithLabelValues("customer_namespace").Add(0) OrphanDBSweepCandidatesTotal.WithLabelValues("redis_namespace").Add(0) + // Prime the R2 per-resource_type customer-backup counter for every + // (type,result) pair so the backup-health dashboard tile renders from + // process start (lazy *Vec otherwise leaves the Mongo/Redis series empty + // until the first real backup of each type). + for _, rt := range []string{"postgres", "vector", "mongodb", "redis"} { + CustomerBackupByTypeTotal.WithLabelValues(rt, "ok").Add(0) + CustomerBackupByTypeTotal.WithLabelValues(rt, "failed").Add(0) + } + // Plain gauge DeployIdleApps.Set(0)