diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d42c98b..ef6038d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -494,6 +494,101 @@ graph TB Helm --> K8s ``` +--- + +## Valkey ACL Provisioning + +Per-tenant cache isolation enforced at the Valkey protocol level. +Each site namespace gets a dedicated ACL user restricted to `~{site}:*` key patterns. + +### Flow: Initial Provisioning + +```mermaid +sequenceDiagram + participant GitOps as GitOps / Bootstrap + participant K8s as Kubernetes API + participant Ctrl as NamespaceReconciler + participant Valkey as Valkey (all nodes) + participant Webhook as Mutating Webhook + participant Pod as Site Pod + + GitOps->>K8s: kubectl annotate ns sites-loja
deco.sites/valkey-acl=true + K8s->>Ctrl: Reconcile Event + Ctrl->>Ctrl: Generate random password + Ctrl->>Valkey: ACL SETUSER loja on >pass ~loja:* ~lock:loja:*
(master + all replicas via Sentinel discovery) + Ctrl->>K8s: Create Secret "valkey-acl" in sites-loja
LOADER_CACHE_REDIS_USERNAME=loja
LOADER_CACHE_REDIS_PASSWORD= + Ctrl->>K8s: Patch ksvc spec.template.annotations
(triggers new Knative Revision) + K8s->>Webhook: Intercept ksvc update + Webhook->>K8s: Inject envFrom: secretRef: valkey-acl (optional=true) + K8s->>Pod: New pod starts with credentials + Pod->>Valkey: AUTH loja → reads/writes only ~loja:* +``` + +### Flow: Sentinel Failover Recovery + +```mermaid +sequenceDiagram + participant Sentinel as Valkey Sentinel + participant Ctrl as NamespaceReconciler + participant PubSub as +switch-master channel + participant Valkey as New Master + Replicas + + Note over Sentinel: Master fails + Sentinel->>Sentinel: Elect new master (quorum) + Sentinel->>PubSub: Publish +switch-master event + PubSub->>Ctrl: WatchFailover() receives event + Ctrl->>Ctrl: RecordSentinelFailover() metric++ + Ctrl->>Ctrl: TriggerResyncAll() — patch all annotated namespaces + loop For each managed namespace + Ctrl->>Valkey: ACL SETUSER (master + all replicas) + end + Note over Ctrl: Recovery in seconds, not minutes +``` + +### ACL Replication Caveat + +Valkey does **not** replicate `ACL SETUSER` commands to replicas. +The operator handles this by running ACL commands on every node individually: + +``` +UpsertUser(ctx, username, password) + ├── ACL SETUSER on master (via Sentinel FailoverClient) + ├── ACL SETUSER on replica-1 (direct connection, discovered via SENTINEL REPLICAS) + └── ACL SETUSER on replica-N +``` + +This ensures all nodes (master and read replicas) are always in sync, +which is required because site pods use a separate read-replica endpoint. + +### Periodic Resync + +The reconciler requeues every namespace on a configurable interval (default: 10min) +to ensure ACLs stay in sync after any undetected node restart. + +``` +VALKEY_ACL_RESYNC_PERIOD=10m # env var or --valkey-acl-resync-period flag +``` + +To force immediate resync of all managed namespaces: + +```bash +kubectl annotate ns -l deco.sites/valkey-acl=true \ + deco.sites/valkey-acl-sync=$(date +%s) --overwrite +``` + +### Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `deco_operator_valkey_acl_provisioned_total` | Counter | ACL users provisioned | +| `deco_operator_valkey_acl_deleted_total` | Counter | ACL users deleted on namespace removal | +| `deco_operator_valkey_acl_errors_total{operation}` | Counter | Errors by operation (upsert/delete/check) | +| `deco_operator_valkey_acl_self_healed_total` | Counter | Re-provisions after ACL loss | +| `deco_operator_valkey_tenants_provisioned` | Gauge | Current provisioned tenants (seeded on startup) | +| `deco_operator_valkey_sentinel_failovers_total` | Counter | Sentinel +switch-master events detected | + +--- + ## Summary The Deco CMS Operator provides: @@ -504,7 +599,6 @@ The Deco CMS Operator provides: 4. **Reliable Notifications**: Parallel, time-bounded, with retries 5. **Deterministic Updates**: Timestamp-based verification 6. **Trackable Rollouts**: Watch conditions with commit/timestamp -7. **Production Ready**: Handles scale, failures, and edge cases - -**All with zero configuration required from users!** 🚀 +7. **Per-tenant Cache Isolation**: Valkey ACL per site, enforced at protocol level +8. **Production Ready**: Handles scale, failures, and edge cases diff --git a/chart/templates/clusterrole-operator-manager-role.yaml b/chart/templates/clusterrole-operator-manager-role.yaml index 993fb50..5a1777f 100644 --- a/chart/templates/clusterrole-operator-manager-role.yaml +++ b/chart/templates/clusterrole-operator-manager-role.yaml @@ -15,12 +15,30 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: - pods + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: - secrets verbs: + - create - get - list - watch @@ -57,4 +75,6 @@ rules: verbs: - get - list + - patch + - update - watch \ No newline at end of file diff --git a/chart/templates/deployment-operator-controller-manager.yaml b/chart/templates/deployment-operator-controller-manager.yaml index 466d680..3203945 100644 --- a/chart/templates/deployment-operator-controller-manager.yaml +++ b/chart/templates/deployment-operator-controller-manager.yaml @@ -31,11 +31,29 @@ spec: command: - /manager image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - {{- if .Values.github.token }} + {{- if or .Values.github.token .Values.valkey.sentinelUrls }} env: + {{- if .Values.github.token }} - name: GITHUB_TOKEN value: {{ .Values.github.token | quote }} {{- end }} + {{- if .Values.valkey.sentinelUrls }} + - name: VALKEY_SENTINEL_URLS + value: {{ .Values.valkey.sentinelUrls | quote }} + - name: VALKEY_SENTINEL_MASTER_NAME + value: {{ .Values.valkey.sentinelMasterName | quote }} + {{- if .Values.valkey.existingSecret }} + - name: VALKEY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.valkey.existingSecret | quote }} + key: {{ .Values.valkey.existingSecretKey | quote }} + {{- else if .Values.valkey.adminPassword }} + - name: VALKEY_ADMIN_PASSWORD + value: {{ .Values.valkey.adminPassword | quote }} + {{- end }} + {{- end }} + {{- end }} livenessProbe: httpGet: path: /healthz diff --git a/chart/values.yaml b/chart/values.yaml index 01746eb..26a219e 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -15,6 +15,19 @@ replicaCount: 1 github: token: "" +# Valkey (Redis) ACL provisioning +# When sentinelUrls is set, the operator provisions per-tenant ACL users in Valkey +# for each namespace annotated with deco.sites/valkey-acl: "true". +# Leave sentinelUrls empty to disable ACL provisioning (NoopClient). +valkey: + sentinelUrls: "" + sentinelMasterName: "mymaster" + # adminPassword: inline value (not recommended for production). + # Use existingSecret + existingSecretKey to source from a K8s Secret instead. + adminPassword: "" + existingSecret: "" + existingSecretKey: "password" + # Resource limits and requests resources: limits: diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go new file mode 100644 index 0000000..8248e9c --- /dev/null +++ b/cmd/bootstrap/main.go @@ -0,0 +1,118 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// bootstrap is a one-shot CLI that annotates existing site Namespaces so the +// NamespaceReconciler picks them up and provisions Valkey ACL credentials. +// +// Usage: +// +// go run ./cmd/bootstrap --namespace-pattern sites- +// go run ./cmd/bootstrap --namespace-pattern sites- --dry-run +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +const valkeyACLAnnotation = "deco.sites/valkey-acl" + +func main() { + var namespacePattern string + var dryRun bool + + flag.StringVar(&namespacePattern, "namespace-pattern", "sites-", + "Prefix used to filter Namespaces eligible for Valkey ACL provisioning.") + flag.BoolVar(&dryRun, "dry-run", false, + "Print which Namespaces would be annotated without making changes.") + + opts := zap.Options{Development: true} + opts.BindFlags(flag.CommandLine) + flag.Parse() + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + log := ctrl.Log.WithName("bootstrap") + + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + cfg, err := ctrl.GetConfig() + if err != nil { + log.Error(err, "Failed to get kubeconfig") + os.Exit(1) + } + + k8s, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "Failed to create Kubernetes client") + os.Exit(1) + } + + ctx := context.Background() + + nsList := &corev1.NamespaceList{} + if err := k8s.List(ctx, nsList); err != nil { + log.Error(err, "Failed to list Namespaces") + os.Exit(1) + } + + annotated, skipped := 0, 0 + for i := range nsList.Items { + ns := &nsList.Items[i] + + if !strings.HasPrefix(ns.Name, namespacePattern) { + continue + } + + if ns.Annotations[valkeyACLAnnotation] == "true" { + log.Info("Already annotated, skipping", "namespace", ns.Name) + skipped++ + continue + } + + if dryRun { + fmt.Printf("[dry-run] would annotate namespace %s\n", ns.Name) + annotated++ + continue + } + + patch := client.MergeFrom(ns.DeepCopy()) + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations[valkeyACLAnnotation] = "true" + if err := k8s.Patch(ctx, ns, patch); err != nil { + log.Error(err, "Failed to annotate namespace", "namespace", ns.Name) + continue + } + log.Info("Annotated namespace", "namespace", ns.Name) + annotated++ + } + + log.Info("Bootstrap complete", "annotated", annotated, "skipped", skipped) +} diff --git a/cmd/main.go b/cmd/main.go index 6328064..7128821 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,10 +17,14 @@ limitations under the License. package main import ( + "context" "crypto/tls" "flag" + "fmt" "os" "path/filepath" + "strings" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -33,6 +37,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -41,6 +46,7 @@ import ( decositesv1alpha1 "github.com/deco-sites/decofile-operator/api/v1alpha1" "github.com/deco-sites/decofile-operator/internal/controller" + "github.com/deco-sites/decofile-operator/internal/valkey" webhookv1 "github.com/deco-sites/decofile-operator/internal/webhook/v1" // +kubebuilder:scaffold:imports ) @@ -68,6 +74,12 @@ func main() { var secureMetrics bool var enableHTTP2 bool var tlsOpts []func(*tls.Config) + var valkeyURL string + var valkeySentinelURLs string + var valkeySentinelMaster string + var valkeyAdminPassword string + var valkeyResyncPeriod time.Duration + var valkeyWatchFailover bool flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -85,6 +97,22 @@ func main() { flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") + flag.StringVar(&valkeyURL, "valkey-url", os.Getenv("VALKEY_URL"), + "Direct Valkey address (host:port). Takes precedence over sentinel. For local dev only.") + flag.StringVar(&valkeySentinelURLs, "valkey-sentinel-urls", os.Getenv("VALKEY_SENTINEL_URLS"), + "Comma-separated list of Valkey Sentinel addresses (host:port). When empty, ACL provisioning is disabled.") + flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master", + getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"), + "Valkey Sentinel master name.") + flag.StringVar(&valkeyAdminPassword, "valkey-admin-password", os.Getenv("VALKEY_ADMIN_PASSWORD"), + "Password for the Valkey admin user used to manage ACLs.") + flag.DurationVar(&valkeyResyncPeriod, "valkey-acl-resync-period", + parseDuration(os.Getenv("VALKEY_ACL_RESYNC_PERIOD"), controller.DefaultResyncPeriod), + "How often to re-sync ACL users to all Valkey nodes (e.g. 10m, 30m, 1h).") + flag.BoolVar(&valkeyWatchFailover, "valkey-watch-failover", + os.Getenv("VALKEY_WATCH_FAILOVER") != "false", + "Subscribe to Sentinel +switch-master events and trigger immediate ACL resync on failover. "+ + "Enabled by default when VALKEY_SENTINEL_URLS is set. Set VALKEY_WATCH_FAILOVER=false to disable.") opts := zap.Options{ Development: true, } @@ -206,6 +234,66 @@ func main() { os.Exit(1) } + // Build Valkey client. Falls back to a no-op client when Sentinel URLs are not configured + // so the operator works in environments where Valkey ACL provisioning is not needed. + var valkeyClient valkey.Client + switch { + case valkeyURL != "": + valkeyClient = valkey.NewDirectClient(valkeyURL, valkeyAdminPassword) + defer func() { _ = valkeyClient.Close() }() + setupLog.Info("Valkey ACL provisioning enabled (direct)", "addr", valkeyURL) + case valkeySentinelURLs != "": + valkeyClient = valkey.NewSentinelClient(valkey.Config{ + SentinelAddrs: strings.Split(valkeySentinelURLs, ","), + MasterName: valkeySentinelMaster, + AdminPassword: valkeyAdminPassword, + }) + defer func() { _ = valkeyClient.Close() }() + setupLog.Info("Valkey ACL provisioning enabled (sentinel)", + "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) + default: + valkeyClient = valkey.NoopClient{} + setupLog.Info("Valkey ACL provisioning disabled (set VALKEY_URL or VALKEY_SENTINEL_URLS)") + } + + nsReconciler := &controller.NamespaceReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ValkeyClient: valkeyClient, + ResyncPeriod: valkeyResyncPeriod, + } + if err := nsReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Namespace") + os.Exit(1) + } + // Start Sentinel failover watcher if enabled and Sentinel is configured. + // leaderElectedRunnable ensures only the active leader subscribes — prevents + // redundant TriggerResyncAll calls from non-leader replicas. + // Fail-safe: if subscription fails, operator continues with periodic resync. + if valkeyWatchFailover && valkeySentinelURLs != "" { + if err := mgr.Add(&leaderElectedRunnable{fn: func(ctx context.Context) error { + return valkeyClient.WatchFailover(ctx, func() { + controller.RecordSentinelFailover() + nsReconciler.TriggerResyncAll(ctx) + }) + }}); err != nil { + setupLog.Error(err, "unable to add Sentinel failover watcher (non-fatal)") + } else { + setupLog.Info("Sentinel failover watcher enabled") + } + } + + // Seed the tenants_provisioned gauge from current cluster state once the cache is warm. + if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + if !mgr.GetCache().WaitForCacheSync(ctx) { + return fmt.Errorf("cache never synced") + } + return nsReconciler.InitMetrics(ctx) + })); err != nil { + setupLog.Error(err, "unable to add metrics init runnable") + os.Exit(1) + } + // Create shared HTTP client for pod notifications to prevent connection leaks httpClient := controller.NewHTTPClient() @@ -261,3 +349,36 @@ func main() { os.Exit(1) } } + +// leaderElectedRunnable wraps a function so it only runs on the active leader. +// controller-runtime starts Runnables on all replicas by default; implementing +// NeedLeaderElection() restricts execution to the leader pod. +type leaderElectedRunnable struct { + fn func(ctx context.Context) error +} + +func (r *leaderElectedRunnable) Start(ctx context.Context) error { + return r.fn(ctx) +} + +func (r *leaderElectedRunnable) NeedLeaderElection() bool { + return true +} + +func parseDuration(s string, fallback time.Duration) time.Duration { + if s == "" { + return fallback + } + d, err := time.ParseDuration(s) + if err != nil { + return fallback + } + return d +} + +func getEnvOrDefault(key, defaultVal string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultVal +} diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index d547341..9b28df2 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -16,12 +16,30 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: - pods + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: - secrets verbs: + - create - get - list - watch @@ -58,4 +76,6 @@ rules: verbs: - get - list + - patch + - update - watch diff --git a/go.mod b/go.mod index c0b8155..7f73982 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -56,6 +57,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.17.0 // indirect + github.com/redis/go-redis/v9 v9.18.0 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.10 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect @@ -69,6 +71,7 @@ require ( go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect diff --git a/go.sum b/go.sum index ecfa09d..338285b 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls= @@ -114,6 +116,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= +github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -162,6 +166,8 @@ go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42s go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4= go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= diff --git a/hack/helm-generator/main.go b/hack/helm-generator/main.go index 493fac0..26cbfb8 100644 --- a/hack/helm-generator/main.go +++ b/hack/helm-generator/main.go @@ -83,9 +83,9 @@ func main() { fileCount++ } - // Add GITHUB_TOKEN to deployment - if err := addGitHubTokenToDeployment(templatesDir); err != nil { - fmt.Fprintf(os.Stderr, "Warning: Could not add GITHUB_TOKEN: %v\n", err) + // Add env vars to deployment + if err := addEnvVarsToDeployment(templatesDir); err != nil { + fmt.Fprintf(os.Stderr, "Warning: Could not add env vars to deployment: %v\n", err) } fmt.Printf("✓ Generated %d Helm templates\n\n", fileCount) @@ -172,7 +172,7 @@ func addConditionals(content, kind string) string { return content } -func addGitHubTokenToDeployment(templatesDir string) error { +func addEnvVarsToDeployment(templatesDir string) error { files, err := filepath.Glob(filepath.Join(templatesDir, "deployment-*.yaml")) if err != nil || len(files) == 0 { return fmt.Errorf("no deployment file found") @@ -187,10 +187,28 @@ func addGitHubTokenToDeployment(templatesDir string) error { contentStr := string(content) // Find the image line and add env vars after it - envBlock := ` {{- if .Values.github.token }} + envBlock := ` {{- if or .Values.github.token .Values.valkey.sentinelUrls }} env: + {{- if .Values.github.token }} - name: GITHUB_TOKEN value: {{ .Values.github.token | quote }} + {{- end }} + {{- if .Values.valkey.sentinelUrls }} + - name: VALKEY_SENTINEL_URLS + value: {{ .Values.valkey.sentinelUrls | quote }} + - name: VALKEY_SENTINEL_MASTER_NAME + value: {{ .Values.valkey.sentinelMasterName | quote }} + {{- if .Values.valkey.existingSecret }} + - name: VALKEY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.valkey.existingSecret | quote }} + key: {{ .Values.valkey.existingSecretKey | quote }} + {{- else if .Values.valkey.adminPassword }} + - name: VALKEY_ADMIN_PASSWORD + value: {{ .Values.valkey.adminPassword | quote }} + {{- end }} + {{- end }} {{- end }}` re := regexp.MustCompile(`(?m)( image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}")`) diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go new file mode 100644 index 0000000..d0fdc95 --- /dev/null +++ b/internal/controller/metrics.go @@ -0,0 +1,90 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +var ( + // valkeyACLProvisioned counts successful ACL user + Secret provisioning operations. + valkeyACLProvisioned = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_provisioned_total", + Help: "Total number of Valkey ACL users provisioned (new or re-provisioned).", + }) + + // valkeyACLDeleted counts ACL user deletions triggered by namespace removal. + valkeyACLDeleted = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_deleted_total", + Help: "Total number of Valkey ACL users deleted on namespace removal.", + }) + + // valkeyACLErrors counts failures when interacting with Valkey. + valkeyACLErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_errors_total", + Help: "Total number of Valkey ACL operation errors by operation type.", + }, []string{"operation"}) // operation: upsert | delete | check + + // valkeyACLSelfHealed counts how many times an ACL was re-created after Valkey restart. + valkeyACLSelfHealed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_self_healed_total", + Help: "Total number of Valkey ACL users re-provisioned after being lost (e.g. Valkey restart).", + }) + + // valkeyTenantsProvisioned tracks the current number of provisioned tenants (gauge). + valkeyTenantsProvisioned = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "tenants_provisioned", + Help: "Current number of site namespaces with a provisioned Valkey ACL user.", + }) + + // valkeySentinelFailovers counts Sentinel +switch-master events received. + // Each event triggers an immediate full ACL resync to all nodes. + valkeySentinelFailovers = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "sentinel_failovers_total", + Help: "Total number of Sentinel master failovers detected via +switch-master pub/sub.", + }) +) + +// RecordSentinelFailover increments the sentinel_failovers_total counter. +// Called from main.go when a +switch-master event is received. +func RecordSentinelFailover() { + valkeySentinelFailovers.Inc() +} + +func init() { + metrics.Registry.MustRegister( + valkeyACLProvisioned, + valkeyACLDeleted, + valkeyACLErrors, + valkeyACLSelfHealed, + valkeyTenantsProvisioned, + valkeySentinelFailovers, + ) +} diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go new file mode 100644 index 0000000..ccdef03 --- /dev/null +++ b/internal/controller/namespace_controller.go @@ -0,0 +1,359 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "crypto/rand" + "encoding/base64" + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + servingknativedevv1 "knative.dev/serving/pkg/apis/serving/v1" + + "github.com/deco-sites/decofile-operator/internal/valkey" +) + +const ( + valkeyACLAnnotation = "deco.sites/valkey-acl" + valkeyACLFinalizer = "deco.sites/valkey-acl" + valkeySecretName = "valkey-acl" + valkeyProvisionedAnnot = "deco.sites/valkey-acl-provisioned" + + siteNamespacePrefix = "sites-" +) + +// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create +// +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;update;patch + +// DefaultResyncPeriod is the default interval at which the reconciler re-syncs +// ACL users to all Valkey nodes even when nothing changed. Configurable via +// VALKEY_ACL_RESYNC_PERIOD (e.g. "10m", "30m", "1h"). +const DefaultResyncPeriod = 10 * time.Minute + +// valkeyACLAnnotationValue is the expected value of the opt-in annotation. +const valkeyACLAnnotationValue = "true" + +// NamespaceReconciler provisions per-tenant Valkey ACL credentials for site namespaces. +// When a Namespace has the annotation "deco.sites/valkey-acl: true", the reconciler: +// - Creates a Valkey ACL user restricted to the site's key prefix. +// - Creates a K8s Secret "valkey-acl" in that namespace with the credentials. +// - Patches the Knative Service to trigger a new Revision that picks up the Secret. +// - Cleans up the Valkey ACL user when the namespace is deleted. +// +// # What triggers a reconcile +// +// - Namespace created or annotated with deco.sites/valkey-acl: "true" +// - Secret "valkey-acl" deleted in a managed namespace (via Watches) +// - Periodic requeue (RequeueAfter: 10min) for self-healing after Valkey restarts +// +// To force an immediate full resync of all managed namespaces (e.g. after a Valkey +// failover), touch all annotated namespaces to trigger reconcile on each: +// +// kubectl annotate ns -l deco.sites/valkey-acl=true \ +// deco.sites/valkey-acl-sync=$(date +%s) --overwrite +// +// # ACL replication caveat +// +// Valkey does NOT replicate ACL commands (ACL SETUSER/DELUSER) to replicas. +// The operator runs ACL commands only on the current Sentinel master. This means: +// +// 1. After a Sentinel failover, the new master starts without per-tenant ACLs. +// The periodic reconcile (10min) detects this and re-provisions all users. +// During the recovery window, deco falls back to FILE_SYSTEM cache. +// +// 2. Read replicas used by pods (LOADER_CACHE_REDIS_READ_URL) also lack the +// per-tenant ACL users. This is not enforced today (auth.enabled: false), +// but MUST be addressed before enabling Valkey auth in production. +// +// TODO: when enabling auth, extend ValkeyClient to provision ACL SETUSER on +// all nodes (master + every replica), not only the Sentinel master. +type NamespaceReconciler struct { + client.Client + Scheme *runtime.Scheme + ValkeyClient valkey.Client + ResyncPeriod time.Duration +} + +// TriggerResyncAll immediately re-queues all managed namespaces by updating a +// sync annotation. Called on Sentinel failover events to recover ACLs without +// waiting for the next periodic resync cycle. +func (r *NamespaceReconciler) TriggerResyncAll(ctx context.Context) { + log := logf.FromContext(ctx).WithName("valkey-resync") + nsList := &corev1.NamespaceList{} + if err := r.List(ctx, nsList); err != nil { + log.Error(err, "Failed to list namespaces for resync") + return + } + now := time.Now().UTC().Format(time.RFC3339) + count := 0 + for i := range nsList.Items { + ns := &nsList.Items[i] + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { + continue + } + patch := client.MergeFrom(ns.DeepCopy()) + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations["deco.sites/valkey-acl-sync"] = now + if err := r.Patch(ctx, ns, patch); err != nil { + log.Error(err, "Failed to patch namespace for resync", "namespace", ns.Name) + continue + } + count++ + } + log.Info("Triggered ACL resync on all managed namespaces", "count", count) +} + +// InitMetrics seeds the tenants_provisioned gauge from current cluster state. +// Must be called after the cache is synced (i.e. inside a Runnable or after mgr.Start). +func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error { + nsList := &corev1.NamespaceList{} + if err := r.List(ctx, nsList); err != nil { + return err + } + count := 0.0 + for _, ns := range nsList.Items { + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { + continue + } + secret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}, secret); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("reading secret in %s: %w", ns.Name, err) + } + continue + } + count++ + } + valkeyTenantsProvisioned.Set(count) + return nil +} + +// SetupWithManager registers the Namespace controller with a resync period for +// self-healing (recovers ACLs lost after a Valkey restart). +func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Watch Secrets named "valkey-acl" and enqueue the parent Namespace. + // Namespace is cluster-scoped so Owns() (which relies on owner references) cannot + // be used across scopes. Instead we map Secret → Namespace by name. + secretToNamespace := handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, obj client.Object) []reconcile.Request { + if obj.GetName() != valkeySecretName { + return nil + } + return []reconcile.Request{ + {NamespacedName: types.NamespacedName{Name: obj.GetNamespace()}}, + } + }, + ) + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Namespace{}). + Watches(&corev1.Secret{}, secretToNamespace). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 4, + }). + Named("namespace-valkey"). + Complete(r) +} + +func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := logf.FromContext(ctx).WithValues("namespace", req.Name) + + ns := &corev1.Namespace{} + if err := r.Get(ctx, req.NamespacedName, ns); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Only process namespaces with the opt-in annotation. + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { + return ctrl.Result{}, nil + } + + siteName := siteNameFromNamespace(ns.Name) + + // Handle deletion: remove the Valkey ACL user before the namespace is gone. + if !ns.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) { + log.Info("Deleting Valkey ACL user", "user", siteName) + if err := r.ValkeyClient.DeleteUser(ctx, siteName); err != nil { + log.Error(err, "Failed to delete Valkey ACL user, will retry") + valkeyACLErrors.WithLabelValues("delete").Inc() + return ctrl.Result{}, err + } + valkeyACLDeleted.Inc() + valkeyTenantsProvisioned.Dec() + log.Info("Valkey ACL user deleted", "user", siteName) + controllerutil.RemoveFinalizer(ns, valkeyACLFinalizer) + if err := r.Update(ctx, ns); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + // Ensure finalizer is present so we can clean up on deletion. + if !controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) { + controllerutil.AddFinalizer(ns, valkeyACLFinalizer) + if err := r.Update(ctx, ns); err != nil { + return ctrl.Result{}, err + } + } + + // Check whether the credential Secret already exists. + secret := &corev1.Secret{} + secretKey := types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name} + err := r.Get(ctx, secretKey, secret) + + switch { + case errors.IsNotFound(err): + // Secret does not exist: generate credentials, create ACL user and Secret. + password, genErr := generatePassword() + if genErr != nil { + return ctrl.Result{}, fmt.Errorf("generate password: %w", genErr) + } + + log.Info("Provisioning Valkey ACL user", "user", siteName) + if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + valkeyACLErrors.WithLabelValues("upsert").Inc() + return ctrl.Result{}, fmt.Errorf("upsert Valkey user: %w", upsertErr) + } + + if createErr := r.createSecret(ctx, ns.Name, siteName, password); createErr != nil { + return ctrl.Result{}, fmt.Errorf("create secret: %w", createErr) + } + + valkeyACLProvisioned.Inc() + valkeyTenantsProvisioned.Inc() + log.Info("Valkey ACL provisioned", "user", siteName, "namespace", ns.Name) + + // Trigger a new Knative Revision so running pods pick up the new Secret. + if patchErr := r.patchKnativeServiceTimestamp(ctx, ns.Name); patchErr != nil { + log.Error(patchErr, "Failed to patch Knative Service (non-fatal)") + } + + case err != nil: + return ctrl.Result{}, fmt.Errorf("get secret: %w", err) + + default: + // Secret exists. Always call UpsertUser on the periodic reconcile — it is + // idempotent and ensures ACLs are present on ALL nodes (master + replicas). + // Valkey does not replicate ACL commands, so a single UpsertUser call from + // the operator is the only way to keep every node in sync. This also covers: + // - Valkey master restart (ACLs lost from memory) + // - Replica replacement or restart + // - Sentinel failover (new master starts without ACLs) + password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) + if password == "" { + log.Error(nil, "valkey-acl Secret has empty password, skipping ACL sync", "namespace", ns.Name) + return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil + } + existed, checkErr := r.ValkeyClient.UserExists(ctx, siteName) + if checkErr != nil { + valkeyACLErrors.WithLabelValues("check").Inc() + return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr) + } + if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + valkeyACLErrors.WithLabelValues("upsert").Inc() + return ctrl.Result{}, fmt.Errorf("sync Valkey user: %w", upsertErr) + } + if !existed { + valkeyACLSelfHealed.Inc() + log.Info("Valkey ACL user re-provisioned (self-heal)", "user", siteName) + } else { + log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName) + } + } + + // Requeue periodically to sync ACLs to all Valkey nodes. + return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil +} + +// createSecret creates the "valkey-acl" Secret in the given namespace with +// credentials ready to be consumed by deco via LOADER_CACHE_REDIS_USERNAME/PASSWORD. +func (r *NamespaceReconciler) createSecret(ctx context.Context, namespace, username, password string) error { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: valkeySecretName, + Namespace: namespace, + }, + StringData: map[string]string{ + "LOADER_CACHE_REDIS_USERNAME": username, + "LOADER_CACHE_REDIS_PASSWORD": password, + }, + } + return r.Create(ctx, secret) +} + +// patchKnativeServiceTimestamp adds/updates the "deco.sites/valkey-acl-provisioned" +// annotation on every Knative Service in the namespace. This causes Knative to create +// a new Revision whose pods will mount the just-created valkey-acl Secret. +func (r *NamespaceReconciler) patchKnativeServiceTimestamp(ctx context.Context, namespace string) error { + log := logf.FromContext(ctx) + + svcList := &servingknativedevv1.ServiceList{} + if err := r.List(ctx, svcList, client.InNamespace(namespace)); err != nil { + return fmt.Errorf("list Knative Services: %w", err) + } + + now := time.Now().UTC().Format(time.RFC3339) + for i := range svcList.Items { + svc := &svcList.Items[i] + patch := client.MergeFrom(svc.DeepCopy()) + // Must annotate spec.template, not metadata — Knative only creates a new + // Revision when spec.template changes. + if svc.Spec.Template.Annotations == nil { + svc.Spec.Template.Annotations = make(map[string]string) + } + svc.Spec.Template.Annotations[valkeyProvisionedAnnot] = now + if err := r.Patch(ctx, svc, patch); err != nil { + log.Error(err, "Failed to patch Knative Service", "service", svc.Name) + } + } + return nil +} + +// siteNameFromNamespace derives the Valkey ACL username from the K8s namespace name. +// The "sites-" prefix is stripped when present so the username matches DECO_SITE_NAME. +func siteNameFromNamespace(namespace string) string { + return strings.TrimPrefix(namespace, siteNamespacePrefix) +} + +// generatePassword produces a cryptographically random 32-byte URL-safe base64 string. +func generatePassword() (string, error) { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + return "", err + } + return base64.URLEncoding.EncodeToString(b), nil +} diff --git a/internal/valkey/client.go b/internal/valkey/client.go new file mode 100644 index 0000000..04ff9d6 --- /dev/null +++ b/internal/valkey/client.go @@ -0,0 +1,281 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package valkey + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/redis/go-redis/v9" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// Client manages Valkey ACL users for tenant isolation. +type Client interface { + // UpsertUser creates or replaces a Valkey ACL user restricted to the given prefix. + UpsertUser(ctx context.Context, username, password string) error + // DeleteUser removes a Valkey ACL user. + DeleteUser(ctx context.Context, username string) error + // UserExists checks whether a Valkey ACL user exists on the master. + UserExists(ctx context.Context, username string) (bool, error) + // WatchFailover subscribes to Sentinel "+switch-master" events and calls + // onFailover whenever a new master is elected. It reconnects automatically + // on disconnect and returns only when ctx is cancelled. It is a no-op for + // non-Sentinel clients. Errors are non-fatal — if Sentinel is unreachable + // the operator continues with its periodic resync. + WatchFailover(ctx context.Context, onFailover func()) error + // Close releases the underlying connection. + Close() error +} + +// Config holds the configuration required to connect to Valkey via Sentinel. +type Config struct { + SentinelAddrs []string + MasterName string + AdminPassword string +} + +// sentinelClient provisions ACL users on the Sentinel master AND on every +// known replica. This is necessary because Valkey does not replicate ACL +// commands — each node maintains its own independent ACL table. Without +// provisioning on replicas, pods that connect via the read-replica endpoint +// (LOADER_CACHE_REDIS_READ_URL) will fail authentication once auth is enabled. +type sentinelClient struct { + // master handles writes and ACL operations via Sentinel leader election. + master *redis.Client + cfg Config +} + +// NewDirectClient returns a Client with a direct connection to a single Valkey instance. +// Intended for local development and testing — not for production use. +func NewDirectClient(addr, password string) Client { + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + }) + return &sentinelClient{master: rdb} +} + +// NewSentinelClient returns a Client that provisions ACL users on the Sentinel +// master and all replicas. +func NewSentinelClient(cfg Config) Client { + master := redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: cfg.MasterName, + SentinelAddrs: cfg.SentinelAddrs, + Password: cfg.AdminPassword, + }) + return &sentinelClient{master: master, cfg: cfg} +} + +// UpsertUser provisions a per-tenant ACL user on the master and all replicas. +func (c *sentinelClient) UpsertUser(ctx context.Context, username, password string) error { + if err := c.aclSetUser(ctx, c.master, username, password); err != nil { + return err + } + return c.forEachReplica(ctx, func(rdb *redis.Client) error { + return c.aclSetUser(ctx, rdb, username, password) + }) +} + +// DeleteUser removes a per-tenant ACL user from the master and all replicas. +func (c *sentinelClient) DeleteUser(ctx context.Context, username string) error { + if err := c.rdo(ctx, c.master, "ACL", "DELUSER", username); err != nil { + return fmt.Errorf("ACL DELUSER %s on master: %w", username, err) + } + return c.forEachReplica(ctx, func(rdb *redis.Client) error { + if err := c.rdo(ctx, rdb, "ACL", "DELUSER", username); err != nil { + return fmt.Errorf("ACL DELUSER %s on replica: %w", username, err) + } + return nil + }) +} + +// UserExists checks for the presence of a Valkey ACL user on the master. +// Replicas are not checked — if the master has the user, replicas should too. +func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, error) { + err := c.master.Do(ctx, "ACL", "GETUSER", username).Err() + if err == nil { + return true, nil + } + if errors.Is(err, redis.Nil) { + return false, nil + } + if strings.Contains(err.Error(), "No such user") { + return false, nil + } + return false, fmt.Errorf("ACL GETUSER %s: %w", username, err) +} + +// WatchFailover subscribes to the Sentinel "+switch-master" pub/sub channel. +// It calls onFailover whenever a master election is detected, then reconnects +// automatically. Safe to call even if Sentinel is temporarily unavailable — +// errors are logged and retried with backoff, never propagated to the caller. +func (c *sentinelClient) WatchFailover(ctx context.Context, onFailover func()) error { + if len(c.cfg.SentinelAddrs) == 0 { + return nil // direct client — no Sentinel to watch + } + logger := log.FromContext(ctx).WithName("valkey-failover-watch") + go func() { + backoff := 2 * time.Second + for { + if err := ctx.Err(); err != nil { + return + } + if err := c.subscribeOnce(ctx, onFailover); err != nil { + logger.Error(err, "Sentinel pub/sub disconnected, retrying", "backoff", backoff) + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + if backoff < 2*time.Minute { + backoff *= 2 + } + } + } else { + backoff = 2 * time.Second // reset on clean exit + } + } + }() + return nil +} + +// subscribeOnce opens one pub/sub session and blocks until the connection drops +// or ctx is cancelled. It calls onFailover for each +switch-master event received. +func (c *sentinelClient) subscribeOnce(ctx context.Context, onFailover func()) error { + logger := log.FromContext(ctx).WithName("valkey-failover-watch") + // Connect to the first reachable sentinel for pub/sub. + sc := redis.NewSentinelClient(&redis.Options{ + Addr: c.cfg.SentinelAddrs[0], + Password: c.cfg.AdminPassword, + }) + ps := sc.Subscribe(ctx, "+switch-master") + defer func() { _ = sc.Close(); _ = ps.Close() }() + defer func() { _ = ps.Close() }() + logger.Info("Subscribed to Sentinel +switch-master events") + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ps.Channel(): + if !ok { + return fmt.Errorf("channel closed") + } + logger.Info("Sentinel failover detected, triggering ACL resync", + "event", msg.Payload) + onFailover() + } + } +} + +// FailoverEventHook is called by WatchFailover on each detected failover event. +// Used by main.go to increment the sentinel_failovers_total metric without +// creating a dependency between the valkey and controller packages. +type FailoverEventHook func() + +// Close closes all underlying connections. +func (c *sentinelClient) Close() error { + return c.master.Close() +} + +// aclSetUser issues ACL SETUSER on a single node. +func (c *sentinelClient) aclSetUser(ctx context.Context, rdb *redis.Client, username, password string) error { + args := []interface{}{ + "ACL", "SETUSER", username, + "on", + "resetpass", + ">" + password, + "resetkeys", + "~" + username + ":*", + "~lock:" + username + ":*", + "resetchannels", + "nocommands", + "+@read", + "+@write", + "+ping", + } + if err := rdb.Do(ctx, args...).Err(); err != nil { + return fmt.Errorf("ACL SETUSER %s: %w", username, err) + } + return nil +} + +// rdo runs a single Redis command on the given client. +func (c *sentinelClient) rdo(ctx context.Context, rdb *redis.Client, args ...interface{}) error { + return rdb.Do(ctx, args...).Err() +} + +// forEachReplica discovers all current replicas via Sentinel and runs fn on each. +// It tries each sentinel address until one responds, avoiding a single point of +// failure. All replica errors are collected and logged; provisioning continues +// even if individual replicas are unreachable. +func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Client) error) error { + if len(c.cfg.SentinelAddrs) == 0 { + return nil // direct client — no replicas to discover + } + logger := log.FromContext(ctx) + + replicas, err := c.sentinelReplicas(ctx) + if err != nil { + return fmt.Errorf("discover replicas: %w", err) + } + + var errs []error + for _, r := range replicas { + addr := r["ip"] + ":" + r["port"] + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + Password: c.cfg.AdminPassword, + }) + if fnErr := fn(rdb); fnErr != nil { + logger.Error(fnErr, "ACL operation failed on replica", "addr", addr) + errs = append(errs, fmt.Errorf("replica %s: %w", addr, fnErr)) + } + _ = rdb.Close() + } + return errors.Join(errs...) +} + +// sentinelReplicas queries each sentinel address until one returns the replica list. +func (c *sentinelClient) sentinelReplicas(ctx context.Context) ([]map[string]string, error) { + var lastErr error + for _, addr := range c.cfg.SentinelAddrs { + sc := redis.NewSentinelClient(&redis.Options{ + Addr: addr, + Password: c.cfg.AdminPassword, + }) + replicas, err := sc.Replicas(ctx, c.cfg.MasterName).Result() + _ = sc.Close() + if err == nil { + return replicas, nil + } + lastErr = err + } + return nil, fmt.Errorf("all sentinels unreachable: %w", lastErr) +} + +// NoopClient is a Client implementation that does nothing, used when Valkey +// configuration is absent (e.g., local development or auth not yet enabled). +type NoopClient struct{} + +func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } +func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } +func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil } +func (NoopClient) WatchFailover(_ context.Context, _ func()) error { return nil } +func (NoopClient) Close() error { return nil } diff --git a/internal/webhook/v1/service_webhook.go b/internal/webhook/v1/service_webhook.go index b4d9e43..3351c0c 100644 --- a/internal/webhook/v1/service_webhook.go +++ b/internal/webhook/v1/service_webhook.go @@ -40,6 +40,7 @@ const ( decofileInjectAnnot = "deco.sites/decofile-inject" decofileMountPathAnnot = "deco.sites/decofile-mount-path" deploymentIdLabel = "app.deco/deploymentId" + valkeyACLSecretName = "valkey-acl" ) // nolint:unused @@ -290,11 +291,40 @@ func (d *ServiceCustomDefaulter) Default(ctx context.Context, obj runtime.Object } service.Spec.Template.Labels[deploymentIdLabel] = deploymentId + // Inject valkey-acl Secret as envFrom so pods receive per-tenant Valkey credentials. + // optional=true ensures pods start even before the Secret is provisioned by the operator, + // falling back to deco's FILE_SYSTEM cache in the meantime. + d.addOrUpdateValkeyEnvFrom(service) + servicelog.Info("Successfully injected Decofile into Service", "service", service.Name, "deploymentId", deploymentId, "configmap", decofile.ConfigMapName()) return nil } +// addOrUpdateValkeyEnvFrom injects the valkey-acl Secret as an envFrom source into the +// target container. It is idempotent: if the reference already exists, it is left unchanged. +func (d *ServiceCustomDefaulter) addOrUpdateValkeyEnvFrom(service *servingknativedevv1.Service) { + if len(service.Spec.Template.Spec.Containers) == 0 { + return + } + idx := d.findTargetContainer(service) + optional := true + for _, ef := range service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom { + if ef.SecretRef != nil && ef.SecretRef.Name == valkeyACLSecretName { + return // already present + } + } + service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom = append( + service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: valkeyACLSecretName}, + Optional: &optional, + }, + }, + ) +} + // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. // NOTE: The 'path' attribute must follow a specific pattern and should not be modified directly here. // Modifying the path for an invalid path can cause API server errors; failing to locate the webhook.