From e0581ef8fed8ca534ba663ad70d02ef5ffa20203 Mon Sep 17 00:00:00 2001 From: decobot Date: Thu, 9 Apr 2026 13:49:34 -0300 Subject: [PATCH 01/17] feat: per-tenant Valkey ACL provisioning Adds a Namespace controller that provisions isolated Valkey ACL users for each site namespace annotated with deco.sites/valkey-acl: "true". Each site gets a K8s Secret (valkey-acl) with its own credentials, restricted to ~{site}:* key patterns. The mutating webhook injects the secret into site pods via envFrom (optional=true, so pods degrade gracefully to FILE_SYSTEM cache if not yet provisioned). A bootstrap CLI annotates existing namespaces for migration. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/bootstrap/main.go | 118 ++++++++++ cmd/main.go | 43 ++++ config/rbac/role.yaml | 14 +- go.mod | 3 + go.sum | 6 + internal/controller/namespace_controller.go | 225 ++++++++++++++++++++ internal/valkey/client.go | 121 +++++++++++ internal/webhook/v1/service_webhook.go | 30 +++ 8 files changed, 559 insertions(+), 1 deletion(-) create mode 100644 cmd/bootstrap/main.go create mode 100644 internal/controller/namespace_controller.go create mode 100644 internal/valkey/client.go diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go new file mode 100644 index 0000000..8248e9c --- /dev/null +++ b/cmd/bootstrap/main.go @@ -0,0 +1,118 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// bootstrap is a one-shot CLI that annotates existing site Namespaces so the +// NamespaceReconciler picks them up and provisions Valkey ACL credentials. +// +// Usage: +// +// go run ./cmd/bootstrap --namespace-pattern sites- +// go run ./cmd/bootstrap --namespace-pattern sites- --dry-run +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +const valkeyACLAnnotation = "deco.sites/valkey-acl" + +func main() { + var namespacePattern string + var dryRun bool + + flag.StringVar(&namespacePattern, "namespace-pattern", "sites-", + "Prefix used to filter Namespaces eligible for Valkey ACL provisioning.") + flag.BoolVar(&dryRun, "dry-run", false, + "Print which Namespaces would be annotated without making changes.") + + opts := zap.Options{Development: true} + opts.BindFlags(flag.CommandLine) + flag.Parse() + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + log := ctrl.Log.WithName("bootstrap") + + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + cfg, err := ctrl.GetConfig() + if err != nil { + log.Error(err, "Failed to get kubeconfig") + os.Exit(1) + } + + k8s, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "Failed to create Kubernetes client") + os.Exit(1) + } + + ctx := context.Background() + + nsList := &corev1.NamespaceList{} + if err := k8s.List(ctx, nsList); err != nil { + log.Error(err, "Failed to list Namespaces") + os.Exit(1) + } + + annotated, skipped := 0, 0 + for i := range nsList.Items { + ns := &nsList.Items[i] + + if !strings.HasPrefix(ns.Name, namespacePattern) { + continue + } + + if ns.Annotations[valkeyACLAnnotation] == "true" { + log.Info("Already annotated, skipping", "namespace", ns.Name) + skipped++ + continue + } + + if dryRun { + fmt.Printf("[dry-run] would annotate namespace %s\n", ns.Name) + annotated++ + continue + } + + patch := client.MergeFrom(ns.DeepCopy()) + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations[valkeyACLAnnotation] = "true" + if err := k8s.Patch(ctx, ns, patch); err != nil { + log.Error(err, "Failed to annotate namespace", "namespace", ns.Name) + continue + } + log.Info("Annotated namespace", "namespace", ns.Name) + annotated++ + } + + log.Info("Bootstrap complete", "annotated", annotated, "skipped", skipped) +} diff --git a/cmd/main.go b/cmd/main.go index 6328064..0705d55 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,6 +21,7 @@ import ( "flag" "os" "path/filepath" + "strings" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -41,6 +42,7 @@ import ( decositesv1alpha1 "github.com/deco-sites/decofile-operator/api/v1alpha1" "github.com/deco-sites/decofile-operator/internal/controller" + "github.com/deco-sites/decofile-operator/internal/valkey" webhookv1 "github.com/deco-sites/decofile-operator/internal/webhook/v1" // +kubebuilder:scaffold:imports ) @@ -68,6 +70,9 @@ func main() { var secureMetrics bool var enableHTTP2 bool var tlsOpts []func(*tls.Config) + var valkeySentinelURLs string + var valkeySentinelMaster string + var valkeyAdminPassword string flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -85,6 +90,12 @@ func main() { flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") + flag.StringVar(&valkeySentinelURLs, "valkey-sentinel-urls", os.Getenv("VALKEY_SENTINEL_URLS"), + "Comma-separated list of Valkey Sentinel addresses (host:port). When empty, ACL provisioning is disabled.") + flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master", getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"), + "Valkey Sentinel master name.") + flag.StringVar(&valkeyAdminPassword, "valkey-admin-password", os.Getenv("VALKEY_ADMIN_PASSWORD"), + "Password for the Valkey admin user used to manage ACLs.") opts := zap.Options{ Development: true, } @@ -206,6 +217,31 @@ func main() { os.Exit(1) } + // Build Valkey client. Falls back to a no-op client when Sentinel URLs are not configured + // so the operator works in environments where Valkey ACL provisioning is not needed. + var valkeyClient valkey.Client + if valkeySentinelURLs != "" { + valkeyClient = valkey.NewSentinelClient(valkey.Config{ + SentinelAddrs: strings.Split(valkeySentinelURLs, ","), + MasterName: valkeySentinelMaster, + AdminPassword: valkeyAdminPassword, + }) + defer func() { _ = valkeyClient.Close() }() + setupLog.Info("Valkey ACL provisioning enabled", "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) + } else { + valkeyClient = valkey.NoopClient{} + setupLog.Info("Valkey ACL provisioning disabled (VALKEY_SENTINEL_URLS not set)") + } + + if err := (&controller.NamespaceReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ValkeyClient: valkeyClient, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Namespace") + os.Exit(1) + } + // Create shared HTTP client for pod notifications to prevent connection leaks httpClient := controller.NewHTTPClient() @@ -261,3 +297,10 @@ func main() { os.Exit(1) } } + +func getEnvOrDefault(key, defaultVal string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultVal +} diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index d547341..3e64d78 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -8,6 +8,7 @@ rules: - "" resources: - configmaps + - secrets verbs: - create - delete @@ -16,11 +17,20 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: - pods - - secrets verbs: - get - list @@ -58,4 +68,6 @@ rules: verbs: - get - list + - patch + - update - watch diff --git a/go.mod b/go.mod index c0b8155..7f73982 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -56,6 +57,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.17.0 // indirect + github.com/redis/go-redis/v9 v9.18.0 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.10 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect @@ -69,6 +71,7 @@ require ( go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect diff --git a/go.sum b/go.sum index ecfa09d..338285b 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls= @@ -114,6 +116,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= +github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -162,6 +166,8 @@ go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42s go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4= go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go new file mode 100644 index 0000000..7a52d7b --- /dev/null +++ b/internal/controller/namespace_controller.go @@ -0,0 +1,225 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "crypto/rand" + "encoding/base64" + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + servingknativedevv1 "knative.dev/serving/pkg/apis/serving/v1" + + "github.com/deco-sites/decofile-operator/internal/valkey" +) + +const ( + valkeyACLAnnotation = "deco.sites/valkey-acl" + valkeyACLFinalizer = "deco.sites/valkey-acl" + valkeySecretName = "valkey-acl" + valkeyProvisionedAnnot = "deco.sites/valkey-acl-provisioned" + + siteNamespacePrefix = "sites-" +) + +// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;update;patch + +// NamespaceReconciler provisions per-tenant Valkey ACL credentials for site namespaces. +// When a Namespace has the annotation "deco.sites/valkey-acl: true", the reconciler: +// - Creates a Valkey ACL user restricted to the site's key prefix. +// - Creates a K8s Secret "valkey-acl" in that namespace with the credentials. +// - Patches the Knative Service to trigger a new Revision that picks up the Secret. +// - Cleans up the Valkey ACL user when the namespace is deleted. +type NamespaceReconciler struct { + client.Client + Scheme *runtime.Scheme + ValkeyClient valkey.Client +} + +// SetupWithManager registers the Namespace controller with a resync period for +// self-healing (recovers ACLs lost after a Valkey restart). +func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Namespace{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 4, + }). + Named("namespace-valkey"). + Complete(r) +} + +func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := logf.FromContext(ctx).WithValues("namespace", req.Name) + + ns := &corev1.Namespace{} + if err := r.Get(ctx, req.NamespacedName, ns); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Only process namespaces with the opt-in annotation. + if ns.Annotations[valkeyACLAnnotation] != "true" { + return ctrl.Result{}, nil + } + + siteName := siteNameFromNamespace(ns.Name) + + // Handle deletion: remove the Valkey ACL user before the namespace is gone. + if !ns.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) { + log.Info("Deleting Valkey ACL user", "user", siteName) + if err := r.ValkeyClient.DeleteUser(ctx, siteName); err != nil { + log.Error(err, "Failed to delete Valkey ACL user, will retry") + return ctrl.Result{}, err + } + controllerutil.RemoveFinalizer(ns, valkeyACLFinalizer) + if err := r.Update(ctx, ns); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + // Ensure finalizer is present so we can clean up on deletion. + if !controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) { + controllerutil.AddFinalizer(ns, valkeyACLFinalizer) + if err := r.Update(ctx, ns); err != nil { + return ctrl.Result{}, err + } + } + + // Check whether the credential Secret already exists. + secret := &corev1.Secret{} + secretKey := types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name} + err := r.Get(ctx, secretKey, secret) + + switch { + case errors.IsNotFound(err): + // Secret does not exist: generate credentials, create ACL user and Secret. + password, genErr := generatePassword() + if genErr != nil { + return ctrl.Result{}, fmt.Errorf("generate password: %w", genErr) + } + + log.Info("Provisioning Valkey ACL user", "user", siteName) + if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + return ctrl.Result{}, fmt.Errorf("upsert Valkey user: %w", upsertErr) + } + + if createErr := r.createSecret(ctx, ns.Name, siteName, password); createErr != nil { + return ctrl.Result{}, fmt.Errorf("create secret: %w", createErr) + } + + // Trigger a new Knative Revision so running pods pick up the new Secret. + if patchErr := r.patchKnativeServiceTimestamp(ctx, ns.Name); patchErr != nil { + log.Error(patchErr, "Failed to patch Knative Service (non-fatal)") + } + + log.Info("Valkey ACL provisioned", "user", siteName) + + case err != nil: + return ctrl.Result{}, fmt.Errorf("get secret: %w", err) + + default: + // Secret exists: verify the ACL user is present in Valkey (self-healing after restart). + exists, checkErr := r.ValkeyClient.UserExists(ctx, siteName) + if checkErr != nil { + return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr) + } + if !exists { + password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) + log.Info("Valkey ACL user missing (Valkey restarted?), re-provisioning", "user", siteName) + if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + return ctrl.Result{}, fmt.Errorf("re-upsert Valkey user: %w", upsertErr) + } + } + } + + // Requeue periodically to self-heal ACLs lost after Valkey restarts. + return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil +} + +// createSecret creates the "valkey-acl" Secret in the given namespace with +// credentials ready to be consumed by deco via LOADER_CACHE_REDIS_USERNAME/PASSWORD. +func (r *NamespaceReconciler) createSecret(ctx context.Context, namespace, username, password string) error { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: valkeySecretName, + Namespace: namespace, + }, + StringData: map[string]string{ + "LOADER_CACHE_REDIS_USERNAME": username, + "LOADER_CACHE_REDIS_PASSWORD": password, + }, + } + return r.Create(ctx, secret) +} + +// patchKnativeServiceTimestamp adds/updates the "deco.sites/valkey-acl-provisioned" +// annotation on every Knative Service in the namespace. This causes Knative to create +// a new Revision whose pods will mount the just-created valkey-acl Secret. +func (r *NamespaceReconciler) patchKnativeServiceTimestamp(ctx context.Context, namespace string) error { + log := logf.FromContext(ctx) + + svcList := &servingknativedevv1.ServiceList{} + if err := r.List(ctx, svcList, client.InNamespace(namespace)); err != nil { + return fmt.Errorf("list Knative Services: %w", err) + } + + now := time.Now().UTC().Format(time.RFC3339) + for i := range svcList.Items { + svc := &svcList.Items[i] + patch := client.MergeFrom(svc.DeepCopy()) + if svc.Annotations == nil { + svc.Annotations = make(map[string]string) + } + svc.Annotations[valkeyProvisionedAnnot] = now + if err := r.Patch(ctx, svc, patch); err != nil { + log.Error(err, "Failed to patch Knative Service", "service", svc.Name) + } + } + return nil +} + +// siteNameFromNamespace derives the Valkey ACL username from the K8s namespace name. +// The "sites-" prefix is stripped when present so the username matches DECO_SITE_NAME. +func siteNameFromNamespace(namespace string) string { + return strings.TrimPrefix(namespace, siteNamespacePrefix) +} + +// generatePassword produces a cryptographically random 32-byte URL-safe base64 string. +func generatePassword() (string, error) { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + return "", err + } + return base64.URLEncoding.EncodeToString(b), nil +} diff --git a/internal/valkey/client.go b/internal/valkey/client.go new file mode 100644 index 0000000..ca32fbb --- /dev/null +++ b/internal/valkey/client.go @@ -0,0 +1,121 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package valkey + +import ( + "context" + "fmt" + "strings" + + "github.com/redis/go-redis/v9" +) + +// Client manages Valkey ACL users for tenant isolation. +type Client interface { + // UpsertUser creates or replaces a Valkey ACL user restricted to the given prefix. + UpsertUser(ctx context.Context, username, password string) error + // DeleteUser removes a Valkey ACL user. + DeleteUser(ctx context.Context, username string) error + // UserExists checks whether a Valkey ACL user exists. + UserExists(ctx context.Context, username string) (bool, error) + // Close releases the underlying connection. + Close() error +} + +// Config holds the configuration required to connect to Valkey via Sentinel. +type Config struct { + SentinelAddrs []string + MasterName string + AdminPassword string +} + +type sentinelClient struct { + rdb *redis.Client +} + +// NewSentinelClient returns a Client backed by a Sentinel-aware connection. +// The underlying go-redis FailoverClient resolves the current master automatically +// and reconnects after a Sentinel failover. +func NewSentinelClient(cfg Config) Client { + rdb := redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: cfg.MasterName, + SentinelAddrs: cfg.SentinelAddrs, + Password: cfg.AdminPassword, + }) + return &sentinelClient{rdb: rdb} +} + +// UpsertUser issues ACL SETUSER to create or replace the per-tenant user. +// The user is restricted to keys matching :* and lock::*. +// nocommands resets any prior permission, then we add only what deco needs. +func (c *sentinelClient) UpsertUser(ctx context.Context, username, password string) error { + args := []interface{}{ + "ACL", "SETUSER", username, + "on", + ">" + password, + "resetkeys", + "~" + username + ":*", + "~lock:" + username + ":*", + "resetchannels", + "nocommands", + "+@read", + "+@write", + "+ping", + } + if err := c.rdb.Do(ctx, args...).Err(); err != nil { + return fmt.Errorf("ACL SETUSER %s: %w", username, err) + } + return nil +} + +// DeleteUser issues ACL DELUSER to remove the per-tenant user. +// Returns nil if the user does not exist. +func (c *sentinelClient) DeleteUser(ctx context.Context, username string) error { + if err := c.rdb.Do(ctx, "ACL", "DELUSER", username).Err(); err != nil { + return fmt.Errorf("ACL DELUSER %s: %w", username, err) + } + return nil +} + +// UserExists checks for the presence of a Valkey ACL user via ACL GETUSER. +func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, error) { + err := c.rdb.Do(ctx, "ACL", "GETUSER", username).Err() + if err == nil { + return true, nil + } + // Valkey returns an error with "ERR No such user" when the user does not exist. + if strings.Contains(err.Error(), "No such user") { + return false, nil + } + return false, fmt.Errorf("ACL GETUSER %s: %w", username, err) +} + +// Close closes the underlying Redis connection pool. +func (c *sentinelClient) Close() error { + return c.rdb.Close() +} + +// NoopClient is a Client implementation that does nothing, used when Valkey +// configuration is absent (e.g., local development or auth not yet enabled). +type NoopClient struct{} + +func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } +func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } +func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { + return false, nil +} +func (NoopClient) Close() error { return nil } diff --git a/internal/webhook/v1/service_webhook.go b/internal/webhook/v1/service_webhook.go index b4d9e43..3351c0c 100644 --- a/internal/webhook/v1/service_webhook.go +++ b/internal/webhook/v1/service_webhook.go @@ -40,6 +40,7 @@ const ( decofileInjectAnnot = "deco.sites/decofile-inject" decofileMountPathAnnot = "deco.sites/decofile-mount-path" deploymentIdLabel = "app.deco/deploymentId" + valkeyACLSecretName = "valkey-acl" ) // nolint:unused @@ -290,11 +291,40 @@ func (d *ServiceCustomDefaulter) Default(ctx context.Context, obj runtime.Object } service.Spec.Template.Labels[deploymentIdLabel] = deploymentId + // Inject valkey-acl Secret as envFrom so pods receive per-tenant Valkey credentials. + // optional=true ensures pods start even before the Secret is provisioned by the operator, + // falling back to deco's FILE_SYSTEM cache in the meantime. + d.addOrUpdateValkeyEnvFrom(service) + servicelog.Info("Successfully injected Decofile into Service", "service", service.Name, "deploymentId", deploymentId, "configmap", decofile.ConfigMapName()) return nil } +// addOrUpdateValkeyEnvFrom injects the valkey-acl Secret as an envFrom source into the +// target container. It is idempotent: if the reference already exists, it is left unchanged. +func (d *ServiceCustomDefaulter) addOrUpdateValkeyEnvFrom(service *servingknativedevv1.Service) { + if len(service.Spec.Template.Spec.Containers) == 0 { + return + } + idx := d.findTargetContainer(service) + optional := true + for _, ef := range service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom { + if ef.SecretRef != nil && ef.SecretRef.Name == valkeyACLSecretName { + return // already present + } + } + service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom = append( + service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: valkeyACLSecretName}, + Optional: &optional, + }, + }, + ) +} + // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. // NOTE: The 'path' attribute must follow a specific pattern and should not be modified directly here. // Modifying the path for an invalid path can cause API server errors; failing to locate the webhook. From 8228744f0c94e75f2dbd33f062eacff08ffe9b4e Mon Sep 17 00:00:00 2001 From: decobot Date: Sat, 11 Apr 2026 20:34:26 -0300 Subject: [PATCH 02/17] chore: update Helm chart with Valkey ACL env vars Adds valkey.sentinelUrls, valkey.sentinelMasterName, and valkey.adminPassword to values.yaml and the deployment template. Updates helm-generator to emit the Valkey env block alongside the existing GITHUB_TOKEN block. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../clusterrole-operator-manager-role.yaml | 14 +++++++++++- ...eployment-operator-controller-manager.yaml | 14 +++++++++++- chart/values.yaml | 9 ++++++++ hack/helm-generator/main.go | 22 ++++++++++++++----- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/chart/templates/clusterrole-operator-manager-role.yaml b/chart/templates/clusterrole-operator-manager-role.yaml index 993fb50..8551f75 100644 --- a/chart/templates/clusterrole-operator-manager-role.yaml +++ b/chart/templates/clusterrole-operator-manager-role.yaml @@ -7,6 +7,7 @@ rules: - "" resources: - configmaps + - secrets verbs: - create - delete @@ -15,11 +16,20 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: - pods - - secrets verbs: - get - list @@ -57,4 +67,6 @@ rules: verbs: - get - list + - patch + - update - watch \ No newline at end of file diff --git a/chart/templates/deployment-operator-controller-manager.yaml b/chart/templates/deployment-operator-controller-manager.yaml index 466d680..9b7d104 100644 --- a/chart/templates/deployment-operator-controller-manager.yaml +++ b/chart/templates/deployment-operator-controller-manager.yaml @@ -31,11 +31,23 @@ spec: command: - /manager image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - {{- if .Values.github.token }} + {{- if or .Values.github.token .Values.valkey.sentinelUrls }} env: + {{- if .Values.github.token }} - name: GITHUB_TOKEN value: {{ .Values.github.token | quote }} {{- end }} + {{- if .Values.valkey.sentinelUrls }} + - name: VALKEY_SENTINEL_URLS + value: {{ .Values.valkey.sentinelUrls | quote }} + - name: VALKEY_SENTINEL_MASTER_NAME + value: {{ .Values.valkey.sentinelMasterName | quote }} + {{- if .Values.valkey.adminPassword }} + - name: VALKEY_ADMIN_PASSWORD + value: {{ .Values.valkey.adminPassword | quote }} + {{- end }} + {{- end }} + {{- end }} livenessProbe: httpGet: path: /healthz diff --git a/chart/values.yaml b/chart/values.yaml index 01746eb..9719e79 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -15,6 +15,15 @@ replicaCount: 1 github: token: "" +# Valkey (Redis) ACL provisioning +# When sentinelUrls is set, the operator provisions per-tenant ACL users in Valkey +# for each namespace annotated with deco.sites/valkey-acl: "true". +# Leave sentinelUrls empty to disable ACL provisioning (NoopClient). +valkey: + sentinelUrls: "" + sentinelMasterName: "mymaster" + adminPassword: "" + # Resource limits and requests resources: limits: diff --git a/hack/helm-generator/main.go b/hack/helm-generator/main.go index 493fac0..2a6de91 100644 --- a/hack/helm-generator/main.go +++ b/hack/helm-generator/main.go @@ -83,9 +83,9 @@ func main() { fileCount++ } - // Add GITHUB_TOKEN to deployment - if err := addGitHubTokenToDeployment(templatesDir); err != nil { - fmt.Fprintf(os.Stderr, "Warning: Could not add GITHUB_TOKEN: %v\n", err) + // Add env vars to deployment + if err := addEnvVarsToDeployment(templatesDir); err != nil { + fmt.Fprintf(os.Stderr, "Warning: Could not add env vars to deployment: %v\n", err) } fmt.Printf("✓ Generated %d Helm templates\n\n", fileCount) @@ -172,7 +172,7 @@ func addConditionals(content, kind string) string { return content } -func addGitHubTokenToDeployment(templatesDir string) error { +func addEnvVarsToDeployment(templatesDir string) error { files, err := filepath.Glob(filepath.Join(templatesDir, "deployment-*.yaml")) if err != nil || len(files) == 0 { return fmt.Errorf("no deployment file found") @@ -187,10 +187,22 @@ func addGitHubTokenToDeployment(templatesDir string) error { contentStr := string(content) // Find the image line and add env vars after it - envBlock := ` {{- if .Values.github.token }} + envBlock := ` {{- if or .Values.github.token .Values.valkey.sentinelUrls }} env: + {{- if .Values.github.token }} - name: GITHUB_TOKEN value: {{ .Values.github.token | quote }} + {{- end }} + {{- if .Values.valkey.sentinelUrls }} + - name: VALKEY_SENTINEL_URLS + value: {{ .Values.valkey.sentinelUrls | quote }} + - name: VALKEY_SENTINEL_MASTER_NAME + value: {{ .Values.valkey.sentinelMasterName | quote }} + {{- if .Values.valkey.adminPassword }} + - name: VALKEY_ADMIN_PASSWORD + value: {{ .Values.valkey.adminPassword | quote }} + {{- end }} + {{- end }} {{- end }}` re := regexp.MustCompile(`(?m)( image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}")`) From cf90b595ca9634ebeb1a4e11782e23a89b09c6cb Mon Sep 17 00:00:00 2001 From: decobot Date: Sat, 11 Apr 2026 21:17:14 -0300 Subject: [PATCH 03/17] fix: patch spec.template annotations to trigger Knative Revision Patching metadata.annotations on a Knative Service does not create a new Revision. Must patch spec.template.annotations so Knative detects the change and starts new pods that mount the valkey-acl Secret. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 7a52d7b..2f4fee4 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -198,10 +198,12 @@ func (r *NamespaceReconciler) patchKnativeServiceTimestamp(ctx context.Context, for i := range svcList.Items { svc := &svcList.Items[i] patch := client.MergeFrom(svc.DeepCopy()) - if svc.Annotations == nil { - svc.Annotations = make(map[string]string) + // Must annotate spec.template, not metadata — Knative only creates a new + // Revision when spec.template changes. + if svc.Spec.Template.Annotations == nil { + svc.Spec.Template.Annotations = make(map[string]string) } - svc.Annotations[valkeyProvisionedAnnot] = now + svc.Spec.Template.Annotations[valkeyProvisionedAnnot] = now if err := r.Patch(ctx, svc, patch); err != nil { log.Error(err, "Failed to patch Knative Service", "service", svc.Name) } From 02d1e3786d03c5854e555302e025095649f0870b Mon Sep 17 00:00:00 2001 From: decobot Date: Sat, 11 Apr 2026 21:30:46 -0300 Subject: [PATCH 04/17] fix: watch valkey-acl Secret deletions to trigger immediate reconcile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Owns() cannot be used here because Namespace is cluster-scoped and Secret is namespace-scoped — K8s does not support cross-scope owner references. Use Watches with a mapper instead so that a manual Secret deletion triggers an immediate reconcile rather than waiting up to the 10-minute RequeueAfter window. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 2f4fee4..99530d8 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -33,7 +33,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" servingknativedevv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -68,8 +70,23 @@ type NamespaceReconciler struct { // SetupWithManager registers the Namespace controller with a resync period for // self-healing (recovers ACLs lost after a Valkey restart). func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Watch Secrets named "valkey-acl" and enqueue the parent Namespace. + // Namespace is cluster-scoped so Owns() (which relies on owner references) cannot + // be used across scopes. Instead we map Secret → Namespace by name. + secretToNamespace := handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, obj client.Object) []reconcile.Request { + if obj.GetName() != valkeySecretName { + return nil + } + return []reconcile.Request{ + {NamespacedName: types.NamespacedName{Name: obj.GetNamespace()}}, + } + }, + ) + return ctrl.NewControllerManagedBy(mgr). For(&corev1.Namespace{}). + Watches(&corev1.Secret{}, secretToNamespace). WithOptions(controller.Options{ MaxConcurrentReconciles: 4, }). From fc532476c2b8b9bbf3b50b85b7b61194633597b8 Mon Sep 17 00:00:00 2001 From: decobot Date: Sat, 11 Apr 2026 21:50:48 -0300 Subject: [PATCH 05/17] feat: add VALKEY_URL direct connection for local development NewDirectClient connects to a single Valkey instance without Sentinel. VALKEY_URL takes precedence over VALKEY_SENTINEL_URLS so local experiments don't require a full Sentinel setup. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/main.go | 16 ++++++++++++---- internal/valkey/client.go | 10 ++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 0705d55..1b87e57 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -70,6 +70,7 @@ func main() { var secureMetrics bool var enableHTTP2 bool var tlsOpts []func(*tls.Config) + var valkeyURL string var valkeySentinelURLs string var valkeySentinelMaster string var valkeyAdminPassword string @@ -90,6 +91,8 @@ func main() { flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") + flag.StringVar(&valkeyURL, "valkey-url", os.Getenv("VALKEY_URL"), + "Direct Valkey address (host:port). Takes precedence over sentinel. For local dev only.") flag.StringVar(&valkeySentinelURLs, "valkey-sentinel-urls", os.Getenv("VALKEY_SENTINEL_URLS"), "Comma-separated list of Valkey Sentinel addresses (host:port). When empty, ACL provisioning is disabled.") flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master", getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"), @@ -220,17 +223,22 @@ func main() { // Build Valkey client. Falls back to a no-op client when Sentinel URLs are not configured // so the operator works in environments where Valkey ACL provisioning is not needed. var valkeyClient valkey.Client - if valkeySentinelURLs != "" { + switch { + case valkeyURL != "": + valkeyClient = valkey.NewDirectClient(valkeyURL, valkeyAdminPassword) + defer func() { _ = valkeyClient.Close() }() + setupLog.Info("Valkey ACL provisioning enabled (direct)", "addr", valkeyURL) + case valkeySentinelURLs != "": valkeyClient = valkey.NewSentinelClient(valkey.Config{ SentinelAddrs: strings.Split(valkeySentinelURLs, ","), MasterName: valkeySentinelMaster, AdminPassword: valkeyAdminPassword, }) defer func() { _ = valkeyClient.Close() }() - setupLog.Info("Valkey ACL provisioning enabled", "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) - } else { + setupLog.Info("Valkey ACL provisioning enabled (sentinel)", "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) + default: valkeyClient = valkey.NoopClient{} - setupLog.Info("Valkey ACL provisioning disabled (VALKEY_SENTINEL_URLS not set)") + setupLog.Info("Valkey ACL provisioning disabled (set VALKEY_URL or VALKEY_SENTINEL_URLS)") } if err := (&controller.NamespaceReconciler{ diff --git a/internal/valkey/client.go b/internal/valkey/client.go index ca32fbb..2c71fd3 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -47,6 +47,16 @@ type sentinelClient struct { rdb *redis.Client } +// NewDirectClient returns a Client with a direct connection to a single Valkey instance. +// Intended for local development and testing — not for production use. +func NewDirectClient(addr, password string) Client { + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + }) + return &sentinelClient{rdb: rdb} +} + // NewSentinelClient returns a Client backed by a Sentinel-aware connection. // The underlying go-redis FailoverClient resolves the current master automatically // and reconnects after a Sentinel failover. From 6f9b4079b5a099cd6acadcbda4f1115e7aba60ac Mon Sep 17 00:00:00 2001 From: decobot Date: Sat, 11 Apr 2026 22:33:07 -0300 Subject: [PATCH 06/17] fix: resetpass before setting new password in ACL SETUSER Without resetpass, ACL SETUSER accumulates password hashes instead of replacing them. This causes stale hashes to remain after the Secret is recreated with a new password (e.g. after self-healing). Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/valkey/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/valkey/client.go b/internal/valkey/client.go index 2c71fd3..8931ed1 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -76,6 +76,7 @@ func (c *sentinelClient) UpsertUser(ctx context.Context, username, password stri args := []interface{}{ "ACL", "SETUSER", username, "on", + "resetpass", // clear any previously stored password hashes ">" + password, "resetkeys", "~" + username + ":*", From 9f0fc41469bd8cee6cf685c3af3b2c35aed0af9f Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 10:20:44 -0300 Subject: [PATCH 07/17] feat: add custom metrics and fix redis.Nil in UserExists Metrics added: - deco_operator_valkey_acl_provisioned_total - deco_operator_valkey_acl_deleted_total - deco_operator_valkey_acl_errors_total{operation} - deco_operator_valkey_acl_self_healed_total - deco_operator_valkey_tenants_provisioned (gauge) Also fixes UserExists to treat redis.Nil as "user not found" instead of returning an error, stopping the reconcile error loop. Adds V(1) log for no-op reconciles and cleaner log messages. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/metrics.go | 74 +++++++++++++++++++++ internal/controller/namespace_controller.go | 19 +++++- internal/valkey/client.go | 7 +- 3 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 internal/controller/metrics.go diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go new file mode 100644 index 0000000..13316c5 --- /dev/null +++ b/internal/controller/metrics.go @@ -0,0 +1,74 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +var ( + // valkeyACLProvisioned counts successful ACL user + Secret provisioning operations. + valkeyACLProvisioned = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_provisioned_total", + Help: "Total number of Valkey ACL users provisioned (new or re-provisioned).", + }) + + // valkeyACLDeleted counts ACL user deletions triggered by namespace removal. + valkeyACLDeleted = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_deleted_total", + Help: "Total number of Valkey ACL users deleted on namespace removal.", + }) + + // valkeyACLErrors counts failures when interacting with Valkey. + valkeyACLErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_errors_total", + Help: "Total number of Valkey ACL operation errors by operation type.", + }, []string{"operation"}) // operation: upsert | delete | check + + // valkeyACLSelfHealed counts how many times an ACL was re-created after Valkey restart. + valkeyACLSelfHealed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "acl_self_healed_total", + Help: "Total number of Valkey ACL users re-provisioned after being lost (e.g. Valkey restart).", + }) + + // valkeyTenantsProvisioned tracks the current number of provisioned tenants (gauge). + valkeyTenantsProvisioned = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "tenants_provisioned", + Help: "Current number of site namespaces with a provisioned Valkey ACL user.", + }) +) + +func init() { + metrics.Registry.MustRegister( + valkeyACLProvisioned, + valkeyACLDeleted, + valkeyACLErrors, + valkeyACLSelfHealed, + valkeyTenantsProvisioned, + ) +} diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 99530d8..132ad79 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -115,8 +115,12 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.Info("Deleting Valkey ACL user", "user", siteName) if err := r.ValkeyClient.DeleteUser(ctx, siteName); err != nil { log.Error(err, "Failed to delete Valkey ACL user, will retry") + valkeyACLErrors.WithLabelValues("delete").Inc() return ctrl.Result{}, err } + valkeyACLDeleted.Inc() + valkeyTenantsProvisioned.Dec() + log.Info("Valkey ACL user deleted", "user", siteName) controllerutil.RemoveFinalizer(ns, valkeyACLFinalizer) if err := r.Update(ctx, ns); err != nil { return ctrl.Result{}, err @@ -148,6 +152,7 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.Info("Provisioning Valkey ACL user", "user", siteName) if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + valkeyACLErrors.WithLabelValues("upsert").Inc() return ctrl.Result{}, fmt.Errorf("upsert Valkey user: %w", upsertErr) } @@ -155,13 +160,15 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, fmt.Errorf("create secret: %w", createErr) } + valkeyACLProvisioned.Inc() + valkeyTenantsProvisioned.Inc() + log.Info("Valkey ACL provisioned", "user", siteName, "namespace", ns.Name) + // Trigger a new Knative Revision so running pods pick up the new Secret. if patchErr := r.patchKnativeServiceTimestamp(ctx, ns.Name); patchErr != nil { log.Error(patchErr, "Failed to patch Knative Service (non-fatal)") } - log.Info("Valkey ACL provisioned", "user", siteName) - case err != nil: return ctrl.Result{}, fmt.Errorf("get secret: %w", err) @@ -169,14 +176,20 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // Secret exists: verify the ACL user is present in Valkey (self-healing after restart). exists, checkErr := r.ValkeyClient.UserExists(ctx, siteName) if checkErr != nil { + valkeyACLErrors.WithLabelValues("check").Inc() return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr) } if !exists { password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) - log.Info("Valkey ACL user missing (Valkey restarted?), re-provisioning", "user", siteName) + log.Info("Valkey ACL user missing, re-provisioning", "user", siteName, "reason", "Valkey restart or external deletion") if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + valkeyACLErrors.WithLabelValues("upsert").Inc() return ctrl.Result{}, fmt.Errorf("re-upsert Valkey user: %w", upsertErr) } + valkeyACLSelfHealed.Inc() + log.Info("Valkey ACL user re-provisioned", "user", siteName) + } else { + log.V(1).Info("Valkey ACL user OK", "user", siteName) } } diff --git a/internal/valkey/client.go b/internal/valkey/client.go index 8931ed1..5a2661d 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -18,6 +18,7 @@ package valkey import ( "context" + "errors" "fmt" "strings" @@ -108,7 +109,11 @@ func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, if err == nil { return true, nil } - // Valkey returns an error with "ERR No such user" when the user does not exist. + // redis.Nil means the command returned a nil response — treat as not found. + if errors.Is(err, redis.Nil) { + return false, nil + } + // Valkey returns "ERR No such user" for unknown usernames. if strings.Contains(err.Error(), "No such user") { return false, nil } From adb8cba266a0c144341c343cb79a5fa86b5b7a69 Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 20:27:39 -0300 Subject: [PATCH 08/17] fix: seed tenants_provisioned gauge from cluster state on startup Without initialization, the gauge resets to 0 on every operator restart. InitMetrics counts namespaces with the annotation that already have a valkey-acl Secret and sets the gauge accordingly, so the metric reflects reality regardless of restarts. Also adds context and fmt imports to cmd/main.go. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/main.go | 18 ++++++++++++++++-- internal/controller/namespace_controller.go | 21 +++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1b87e57..d738aca 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,8 +17,10 @@ limitations under the License. package main import ( + "context" "crypto/tls" "flag" + "fmt" "os" "path/filepath" "strings" @@ -31,6 +33,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -241,14 +244,25 @@ func main() { setupLog.Info("Valkey ACL provisioning disabled (set VALKEY_URL or VALKEY_SENTINEL_URLS)") } - if err := (&controller.NamespaceReconciler{ + nsReconciler := &controller.NamespaceReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), ValkeyClient: valkeyClient, - }).SetupWithManager(mgr); err != nil { + } + if err := nsReconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Namespace") os.Exit(1) } + // Seed the tenants_provisioned gauge from current cluster state once the cache is warm. + if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + if !mgr.GetCache().WaitForCacheSync(ctx) { + return fmt.Errorf("cache never synced") + } + return nsReconciler.InitMetrics(ctx) + })); err != nil { + setupLog.Error(err, "unable to add metrics init runnable") + os.Exit(1) + } // Create shared HTTP client for pod notifications to prevent connection leaks httpClient := controller.NewHTTPClient() diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 132ad79..d3643b7 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -67,6 +67,27 @@ type NamespaceReconciler struct { ValkeyClient valkey.Client } +// InitMetrics seeds the tenants_provisioned gauge from current cluster state. +// Must be called after the cache is synced (i.e. inside a Runnable or after mgr.Start). +func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error { + nsList := &corev1.NamespaceList{} + if err := r.List(ctx, nsList); err != nil { + return err + } + count := 0.0 + for _, ns := range nsList.Items { + if ns.Annotations[valkeyACLAnnotation] != "true" { + continue + } + secret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}, secret); err == nil { + count++ + } + } + valkeyTenantsProvisioned.Set(count) + return nil +} + // SetupWithManager registers the Namespace controller with a resync period for // self-healing (recovers ACLs lost after a Valkey restart). func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error { From 80f0f65022a83a511c11eb12d5ce1823a13ca2d7 Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 20:31:52 -0300 Subject: [PATCH 09/17] fix: gofmt and lll lint errors Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/main.go | 8 +++++--- internal/controller/namespace_controller.go | 6 +++--- internal/valkey/client.go | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index d738aca..aafef9d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -33,10 +33,10 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -98,7 +98,8 @@ func main() { "Direct Valkey address (host:port). Takes precedence over sentinel. For local dev only.") flag.StringVar(&valkeySentinelURLs, "valkey-sentinel-urls", os.Getenv("VALKEY_SENTINEL_URLS"), "Comma-separated list of Valkey Sentinel addresses (host:port). When empty, ACL provisioning is disabled.") - flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master", getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"), + flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master", + getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"), "Valkey Sentinel master name.") flag.StringVar(&valkeyAdminPassword, "valkey-admin-password", os.Getenv("VALKEY_ADMIN_PASSWORD"), "Password for the Valkey admin user used to manage ACLs.") @@ -238,7 +239,8 @@ func main() { AdminPassword: valkeyAdminPassword, }) defer func() { _ = valkeyClient.Close() }() - setupLog.Info("Valkey ACL provisioning enabled (sentinel)", "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) + setupLog.Info("Valkey ACL provisioning enabled (sentinel)", + "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster) default: valkeyClient = valkey.NoopClient{} setupLog.Info("Valkey ACL provisioning disabled (set VALKEY_URL or VALKEY_SENTINEL_URLS)") diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index d3643b7..78fd1c6 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -43,9 +43,9 @@ import ( ) const ( - valkeyACLAnnotation = "deco.sites/valkey-acl" - valkeyACLFinalizer = "deco.sites/valkey-acl" - valkeySecretName = "valkey-acl" + valkeyACLAnnotation = "deco.sites/valkey-acl" + valkeyACLFinalizer = "deco.sites/valkey-acl" + valkeySecretName = "valkey-acl" valkeyProvisionedAnnot = "deco.sites/valkey-acl-provisioned" siteNamespacePrefix = "sites-" diff --git a/internal/valkey/client.go b/internal/valkey/client.go index 5a2661d..a60eb76 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -77,7 +77,7 @@ func (c *sentinelClient) UpsertUser(ctx context.Context, username, password stri args := []interface{}{ "ACL", "SETUSER", username, "on", - "resetpass", // clear any previously stored password hashes + "resetpass", // clear any previously stored password hashes ">" + password, "resetkeys", "~" + username + ":*", @@ -130,7 +130,7 @@ func (c *sentinelClient) Close() error { type NoopClient struct{} func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } -func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } +func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil } From d71c0cdbb8e51d1c08013e74e54bad26546d89bb Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 21:40:24 -0300 Subject: [PATCH 10/17] docs: document sync triggers and ACL replication caveat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Explains what triggers a reconcile, how to force an immediate full resync via kubectl annotate, and the important caveat that Valkey does not replicate ACL commands to replicas — which means read replicas and new masters after failover won't have per-tenant ACLs until the next reconcile cycle. Adds a TODO for when auth is enabled in production. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 28 +++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 78fd1c6..ad0f03f 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -61,6 +61,34 @@ const ( // - Creates a K8s Secret "valkey-acl" in that namespace with the credentials. // - Patches the Knative Service to trigger a new Revision that picks up the Secret. // - Cleans up the Valkey ACL user when the namespace is deleted. +// +// # What triggers a reconcile +// +// - Namespace created or annotated with deco.sites/valkey-acl: "true" +// - Secret "valkey-acl" deleted in a managed namespace (via Watches) +// - Periodic requeue (RequeueAfter: 10min) for self-healing after Valkey restarts +// +// To force an immediate full resync of all managed namespaces (e.g. after a Valkey +// failover), touch all annotated namespaces to trigger reconcile on each: +// +// kubectl annotate ns -l deco.sites/valkey-acl=true \ +// deco.sites/valkey-acl-sync=$(date +%s) --overwrite +// +// # ACL replication caveat +// +// Valkey does NOT replicate ACL commands (ACL SETUSER/DELUSER) to replicas. +// The operator runs ACL commands only on the current Sentinel master. This means: +// +// 1. After a Sentinel failover, the new master starts without per-tenant ACLs. +// The periodic reconcile (10min) detects this and re-provisions all users. +// During the recovery window, deco falls back to FILE_SYSTEM cache. +// +// 2. Read replicas used by pods (LOADER_CACHE_REDIS_READ_URL) also lack the +// per-tenant ACL users. This is not enforced today (auth.enabled: false), +// but MUST be addressed before enabling Valkey auth in production. +// +// TODO: when enabling auth, extend ValkeyClient to provision ACL SETUSER on +// all nodes (master + every replica), not only the Sentinel master. type NamespaceReconciler struct { client.Client Scheme *runtime.Scheme From ddf895e183b25fa60654c1977259e0d412d568cd Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 21:51:29 -0300 Subject: [PATCH 11/17] fix: provision ACL users on all Valkey nodes, not just master MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Valkey does not replicate ACL commands. Pods using read replicas (LOADER_CACHE_REDIS_READ_URL) would fail auth if only the master had the per-tenant users. Changes: - sentinelClient now discovers replicas via Sentinel and runs ACL SETUSER/DELUSER on each node individually - Periodic reconcile always calls UpsertUser (idempotent) to keep all nodes in sync — covers master restart, replica replacement, and Sentinel failover scenarios Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 29 ++--- internal/valkey/client.go | 131 ++++++++++++++------ 2 files changed, 106 insertions(+), 54 deletions(-) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index ad0f03f..8f97133 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -222,24 +222,19 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, fmt.Errorf("get secret: %w", err) default: - // Secret exists: verify the ACL user is present in Valkey (self-healing after restart). - exists, checkErr := r.ValkeyClient.UserExists(ctx, siteName) - if checkErr != nil { - valkeyACLErrors.WithLabelValues("check").Inc() - return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr) - } - if !exists { - password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) - log.Info("Valkey ACL user missing, re-provisioning", "user", siteName, "reason", "Valkey restart or external deletion") - if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { - valkeyACLErrors.WithLabelValues("upsert").Inc() - return ctrl.Result{}, fmt.Errorf("re-upsert Valkey user: %w", upsertErr) - } - valkeyACLSelfHealed.Inc() - log.Info("Valkey ACL user re-provisioned", "user", siteName) - } else { - log.V(1).Info("Valkey ACL user OK", "user", siteName) + // Secret exists. Always call UpsertUser on the periodic reconcile — it is + // idempotent and ensures ACLs are present on ALL nodes (master + replicas). + // Valkey does not replicate ACL commands, so a single UpsertUser call from + // the operator is the only way to keep every node in sync. This also covers: + // - Valkey master restart (ACLs lost from memory) + // - Replica replacement or restart + // - Sentinel failover (new master starts without ACLs) + password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) + if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { + valkeyACLErrors.WithLabelValues("upsert").Inc() + return ctrl.Result{}, fmt.Errorf("sync Valkey user: %w", upsertErr) } + log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName) } // Requeue periodically to self-heal ACLs lost after Valkey restarts. diff --git a/internal/valkey/client.go b/internal/valkey/client.go index a60eb76..45b5c9a 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -31,7 +31,7 @@ type Client interface { UpsertUser(ctx context.Context, username, password string) error // DeleteUser removes a Valkey ACL user. DeleteUser(ctx context.Context, username string) error - // UserExists checks whether a Valkey ACL user exists. + // UserExists checks whether a Valkey ACL user exists on the master. UserExists(ctx context.Context, username string) (bool, error) // Close releases the underlying connection. Close() error @@ -44,8 +44,17 @@ type Config struct { AdminPassword string } +// sentinelClient provisions ACL users on the Sentinel master AND on every +// known replica. This is necessary because Valkey does not replicate ACL +// commands — each node maintains its own independent ACL table. Without +// provisioning on replicas, pods that connect via the read-replica endpoint +// (LOADER_CACHE_REDIS_READ_URL) will fail authentication once auth is enabled. type sentinelClient struct { - rdb *redis.Client + // master handles writes and ACL operations via Sentinel leader election. + master *redis.Client + // sentinel is used to discover all replica addresses. + sentinel *redis.SentinelClient + cfg Config } // NewDirectClient returns a Client with a direct connection to a single Valkey instance. @@ -55,29 +64,77 @@ func NewDirectClient(addr, password string) Client { Addr: addr, Password: password, }) - return &sentinelClient{rdb: rdb} + return &sentinelClient{master: rdb} } -// NewSentinelClient returns a Client backed by a Sentinel-aware connection. -// The underlying go-redis FailoverClient resolves the current master automatically -// and reconnects after a Sentinel failover. +// NewSentinelClient returns a Client that provisions ACL users on the Sentinel +// master and all replicas. func NewSentinelClient(cfg Config) Client { - rdb := redis.NewFailoverClient(&redis.FailoverOptions{ + master := redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: cfg.MasterName, SentinelAddrs: cfg.SentinelAddrs, Password: cfg.AdminPassword, }) - return &sentinelClient{rdb: rdb} + sentinel := redis.NewSentinelClient(&redis.Options{ + Addr: cfg.SentinelAddrs[0], + Password: cfg.AdminPassword, + }) + return &sentinelClient{master: master, sentinel: sentinel, cfg: cfg} } -// UpsertUser issues ACL SETUSER to create or replace the per-tenant user. -// The user is restricted to keys matching :* and lock::*. -// nocommands resets any prior permission, then we add only what deco needs. +// UpsertUser provisions a per-tenant ACL user on the master and all replicas. func (c *sentinelClient) UpsertUser(ctx context.Context, username, password string) error { + if err := c.aclSetUser(ctx, c.master, username, password); err != nil { + return err + } + return c.forEachReplica(ctx, func(rdb *redis.Client) error { + return c.aclSetUser(ctx, rdb, username, password) + }) +} + +// DeleteUser removes a per-tenant ACL user from the master and all replicas. +func (c *sentinelClient) DeleteUser(ctx context.Context, username string) error { + if err := c.rdo(ctx, c.master, "ACL", "DELUSER", username); err != nil { + return fmt.Errorf("ACL DELUSER %s on master: %w", username, err) + } + return c.forEachReplica(ctx, func(rdb *redis.Client) error { + if err := c.rdo(ctx, rdb, "ACL", "DELUSER", username); err != nil { + return fmt.Errorf("ACL DELUSER %s on replica: %w", username, err) + } + return nil + }) +} + +// UserExists checks for the presence of a Valkey ACL user on the master. +// Replicas are not checked — if the master has the user, replicas should too. +func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, error) { + err := c.master.Do(ctx, "ACL", "GETUSER", username).Err() + if err == nil { + return true, nil + } + if errors.Is(err, redis.Nil) { + return false, nil + } + if strings.Contains(err.Error(), "No such user") { + return false, nil + } + return false, fmt.Errorf("ACL GETUSER %s: %w", username, err) +} + +// Close closes all underlying connections. +func (c *sentinelClient) Close() error { + if c.sentinel != nil { + _ = c.sentinel.Close() + } + return c.master.Close() +} + +// aclSetUser issues ACL SETUSER on a single node. +func (c *sentinelClient) aclSetUser(ctx context.Context, rdb *redis.Client, username, password string) error { args := []interface{}{ "ACL", "SETUSER", username, "on", - "resetpass", // clear any previously stored password hashes + "resetpass", ">" + password, "resetkeys", "~" + username + ":*", @@ -88,41 +145,41 @@ func (c *sentinelClient) UpsertUser(ctx context.Context, username, password stri "+@write", "+ping", } - if err := c.rdb.Do(ctx, args...).Err(); err != nil { + if err := rdb.Do(ctx, args...).Err(); err != nil { return fmt.Errorf("ACL SETUSER %s: %w", username, err) } return nil } -// DeleteUser issues ACL DELUSER to remove the per-tenant user. -// Returns nil if the user does not exist. -func (c *sentinelClient) DeleteUser(ctx context.Context, username string) error { - if err := c.rdb.Do(ctx, "ACL", "DELUSER", username).Err(); err != nil { - return fmt.Errorf("ACL DELUSER %s: %w", username, err) - } - return nil +// rdo runs a single Redis command on the given client. +func (c *sentinelClient) rdo(ctx context.Context, rdb *redis.Client, args ...interface{}) error { + return rdb.Do(ctx, args...).Err() } -// UserExists checks for the presence of a Valkey ACL user via ACL GETUSER. -func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, error) { - err := c.rdb.Do(ctx, "ACL", "GETUSER", username).Err() - if err == nil { - return true, nil +// forEachReplica discovers all current replicas via Sentinel and runs fn on each. +// Errors from individual replicas are logged but do not abort the loop — a +// best-effort approach ensures a single unreachable replica doesn't block provisioning. +func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Client) error) error { + if c.sentinel == nil { + return nil // direct client — no replicas to discover } - // redis.Nil means the command returned a nil response — treat as not found. - if errors.Is(err, redis.Nil) { - return false, nil + replicas, err := c.sentinel.Replicas(ctx, c.cfg.MasterName).Result() + if err != nil { + return fmt.Errorf("sentinel replicas: %w", err) } - // Valkey returns "ERR No such user" for unknown usernames. - if strings.Contains(err.Error(), "No such user") { - return false, nil + var lastErr error + for _, r := range replicas { + addr := r["ip"] + ":" + r["port"] + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + Password: c.cfg.AdminPassword, + }) + if err := fn(rdb); err != nil { + lastErr = err + } + _ = rdb.Close() } - return false, fmt.Errorf("ACL GETUSER %s: %w", username, err) -} - -// Close closes the underlying Redis connection pool. -func (c *sentinelClient) Close() error { - return c.rdb.Close() + return lastErr } // NoopClient is a Client implementation that does nothing, used when Valkey From d034525cc5007683c7b4ec1a760bd175cba15d2b Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 21:56:58 -0300 Subject: [PATCH 12/17] feat: configurable ACL resync period via VALKEY_ACL_RESYNC_PERIOD Exposes --valkey-acl-resync-period flag (and VALKEY_ACL_RESYNC_PERIOD env var) to control how often ACL users are re-synced to all Valkey nodes. Defaults to 10m. Examples: 30m, 1h. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/main.go | 17 +++++++++++++++++ internal/controller/namespace_controller.go | 10 ++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index aafef9d..f191160 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "strings" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -77,6 +78,7 @@ func main() { var valkeySentinelURLs string var valkeySentinelMaster string var valkeyAdminPassword string + var valkeyResyncPeriod time.Duration flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -103,6 +105,9 @@ func main() { "Valkey Sentinel master name.") flag.StringVar(&valkeyAdminPassword, "valkey-admin-password", os.Getenv("VALKEY_ADMIN_PASSWORD"), "Password for the Valkey admin user used to manage ACLs.") + flag.DurationVar(&valkeyResyncPeriod, "valkey-acl-resync-period", + parseDuration(os.Getenv("VALKEY_ACL_RESYNC_PERIOD"), controller.DefaultResyncPeriod), + "How often to re-sync ACL users to all Valkey nodes (e.g. 10m, 30m, 1h).") opts := zap.Options{ Development: true, } @@ -250,6 +255,7 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), ValkeyClient: valkeyClient, + ResyncPeriod: valkeyResyncPeriod, } if err := nsReconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Namespace") @@ -322,6 +328,17 @@ func main() { } } +func parseDuration(s string, fallback time.Duration) time.Duration { + if s == "" { + return fallback + } + d, err := time.ParseDuration(s) + if err != nil { + return fallback + } + return d +} + func getEnvOrDefault(key, defaultVal string) string { if v := os.Getenv(key); v != "" { return v diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 8f97133..fa38885 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -55,6 +55,11 @@ const ( // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;update;patch +// DefaultResyncPeriod is the default interval at which the reconciler re-syncs +// ACL users to all Valkey nodes even when nothing changed. Configurable via +// VALKEY_ACL_RESYNC_PERIOD (e.g. "10m", "30m", "1h"). +const DefaultResyncPeriod = 10 * time.Minute + // NamespaceReconciler provisions per-tenant Valkey ACL credentials for site namespaces. // When a Namespace has the annotation "deco.sites/valkey-acl: true", the reconciler: // - Creates a Valkey ACL user restricted to the site's key prefix. @@ -93,6 +98,7 @@ type NamespaceReconciler struct { client.Client Scheme *runtime.Scheme ValkeyClient valkey.Client + ResyncPeriod time.Duration } // InitMetrics seeds the tenants_provisioned gauge from current cluster state. @@ -237,8 +243,8 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName) } - // Requeue periodically to self-heal ACLs lost after Valkey restarts. - return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil + // Requeue periodically to sync ACLs to all Valkey nodes. + return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil } // createSecret creates the "valkey-acl" Secret in the given namespace with From da55cab0bc70f3a7c949f39311c4a4bb453cb0b2 Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 22:34:23 -0300 Subject: [PATCH 13/17] feat: Sentinel failover watch, all-node ACL sync, configurable resync, docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sentinel failover watch: - WatchFailover() subscribes to +switch-master pub/sub - Reconnects with exponential backoff — never crashes the operator - Enabled by default when VALKEY_SENTINEL_URLS is set - Disable via VALKEY_WATCH_FAILOVER=false - RecordSentinelFailover() increments sentinel_failovers_total metric All-node ACL sync: - UpsertUser/DeleteUser now run on master + all replicas via Sentinel discovery - Periodic reconcile always calls UpsertUser (idempotent) to keep nodes in sync - Covers: master restart, replica replacement, Sentinel failover Configurable resync period: - VALKEY_ACL_RESYNC_PERIOD env var (default 10m) - --valkey-acl-resync-period flag New metric: deco_operator_valkey_sentinel_failovers_total ARCHITECTURE.md: added Valkey ACL provisioning section with mermaid diagrams for initial provisioning and failover recovery flows, ACL replication caveat, periodic resync docs, and metrics table. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- ARCHITECTURE.md | 100 +++++++++++++++++++- cmd/main.go | 20 ++++ internal/controller/metrics.go | 16 ++++ internal/controller/namespace_controller.go | 31 ++++++ internal/valkey/client.go | 79 ++++++++++++++-- 5 files changed, 237 insertions(+), 9 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d42c98b..ef6038d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -494,6 +494,101 @@ graph TB Helm --> K8s ``` +--- + +## Valkey ACL Provisioning + +Per-tenant cache isolation enforced at the Valkey protocol level. +Each site namespace gets a dedicated ACL user restricted to `~{site}:*` key patterns. + +### Flow: Initial Provisioning + +```mermaid +sequenceDiagram + participant GitOps as GitOps / Bootstrap + participant K8s as Kubernetes API + participant Ctrl as NamespaceReconciler + participant Valkey as Valkey (all nodes) + participant Webhook as Mutating Webhook + participant Pod as Site Pod + + GitOps->>K8s: kubectl annotate ns sites-loja
deco.sites/valkey-acl=true + K8s->>Ctrl: Reconcile Event + Ctrl->>Ctrl: Generate random password + Ctrl->>Valkey: ACL SETUSER loja on >pass ~loja:* ~lock:loja:*
(master + all replicas via Sentinel discovery) + Ctrl->>K8s: Create Secret "valkey-acl" in sites-loja
LOADER_CACHE_REDIS_USERNAME=loja
LOADER_CACHE_REDIS_PASSWORD= + Ctrl->>K8s: Patch ksvc spec.template.annotations
(triggers new Knative Revision) + K8s->>Webhook: Intercept ksvc update + Webhook->>K8s: Inject envFrom: secretRef: valkey-acl (optional=true) + K8s->>Pod: New pod starts with credentials + Pod->>Valkey: AUTH loja → reads/writes only ~loja:* +``` + +### Flow: Sentinel Failover Recovery + +```mermaid +sequenceDiagram + participant Sentinel as Valkey Sentinel + participant Ctrl as NamespaceReconciler + participant PubSub as +switch-master channel + participant Valkey as New Master + Replicas + + Note over Sentinel: Master fails + Sentinel->>Sentinel: Elect new master (quorum) + Sentinel->>PubSub: Publish +switch-master event + PubSub->>Ctrl: WatchFailover() receives event + Ctrl->>Ctrl: RecordSentinelFailover() metric++ + Ctrl->>Ctrl: TriggerResyncAll() — patch all annotated namespaces + loop For each managed namespace + Ctrl->>Valkey: ACL SETUSER (master + all replicas) + end + Note over Ctrl: Recovery in seconds, not minutes +``` + +### ACL Replication Caveat + +Valkey does **not** replicate `ACL SETUSER` commands to replicas. +The operator handles this by running ACL commands on every node individually: + +``` +UpsertUser(ctx, username, password) + ├── ACL SETUSER on master (via Sentinel FailoverClient) + ├── ACL SETUSER on replica-1 (direct connection, discovered via SENTINEL REPLICAS) + └── ACL SETUSER on replica-N +``` + +This ensures all nodes (master and read replicas) are always in sync, +which is required because site pods use a separate read-replica endpoint. + +### Periodic Resync + +The reconciler requeues every namespace on a configurable interval (default: 10min) +to ensure ACLs stay in sync after any undetected node restart. + +``` +VALKEY_ACL_RESYNC_PERIOD=10m # env var or --valkey-acl-resync-period flag +``` + +To force immediate resync of all managed namespaces: + +```bash +kubectl annotate ns -l deco.sites/valkey-acl=true \ + deco.sites/valkey-acl-sync=$(date +%s) --overwrite +``` + +### Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `deco_operator_valkey_acl_provisioned_total` | Counter | ACL users provisioned | +| `deco_operator_valkey_acl_deleted_total` | Counter | ACL users deleted on namespace removal | +| `deco_operator_valkey_acl_errors_total{operation}` | Counter | Errors by operation (upsert/delete/check) | +| `deco_operator_valkey_acl_self_healed_total` | Counter | Re-provisions after ACL loss | +| `deco_operator_valkey_tenants_provisioned` | Gauge | Current provisioned tenants (seeded on startup) | +| `deco_operator_valkey_sentinel_failovers_total` | Counter | Sentinel +switch-master events detected | + +--- + ## Summary The Deco CMS Operator provides: @@ -504,7 +599,6 @@ The Deco CMS Operator provides: 4. **Reliable Notifications**: Parallel, time-bounded, with retries 5. **Deterministic Updates**: Timestamp-based verification 6. **Trackable Rollouts**: Watch conditions with commit/timestamp -7. **Production Ready**: Handles scale, failures, and edge cases - -**All with zero configuration required from users!** 🚀 +7. **Per-tenant Cache Isolation**: Valkey ACL per site, enforced at protocol level +8. **Production Ready**: Handles scale, failures, and edge cases diff --git a/cmd/main.go b/cmd/main.go index f191160..ea6c318 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -79,6 +79,7 @@ func main() { var valkeySentinelMaster string var valkeyAdminPassword string var valkeyResyncPeriod time.Duration + var valkeyWatchFailover bool flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -108,6 +109,10 @@ func main() { flag.DurationVar(&valkeyResyncPeriod, "valkey-acl-resync-period", parseDuration(os.Getenv("VALKEY_ACL_RESYNC_PERIOD"), controller.DefaultResyncPeriod), "How often to re-sync ACL users to all Valkey nodes (e.g. 10m, 30m, 1h).") + flag.BoolVar(&valkeyWatchFailover, "valkey-watch-failover", + os.Getenv("VALKEY_WATCH_FAILOVER") != "false", + "Subscribe to Sentinel +switch-master events and trigger immediate ACL resync on failover. "+ + "Enabled by default when VALKEY_SENTINEL_URLS is set. Set VALKEY_WATCH_FAILOVER=false to disable.") opts := zap.Options{ Development: true, } @@ -261,6 +266,21 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Namespace") os.Exit(1) } + // Start Sentinel failover watcher if enabled and Sentinel is configured. + // Fail-safe: if subscription fails, operator continues with periodic resync. + if valkeyWatchFailover && valkeySentinelURLs != "" { + if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + return valkeyClient.WatchFailover(ctx, func() { + controller.RecordSentinelFailover() + nsReconciler.TriggerResyncAll(ctx) + }) + })); err != nil { + setupLog.Error(err, "unable to add Sentinel failover watcher (non-fatal)") + } else { + setupLog.Info("Sentinel failover watcher enabled") + } + } + // Seed the tenants_provisioned gauge from current cluster state once the cache is warm. if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { if !mgr.GetCache().WaitForCacheSync(ctx) { diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go index 13316c5..d0fdc95 100644 --- a/internal/controller/metrics.go +++ b/internal/controller/metrics.go @@ -61,8 +61,23 @@ var ( Name: "tenants_provisioned", Help: "Current number of site namespaces with a provisioned Valkey ACL user.", }) + + // valkeySentinelFailovers counts Sentinel +switch-master events received. + // Each event triggers an immediate full ACL resync to all nodes. + valkeySentinelFailovers = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "deco_operator", + Subsystem: "valkey", + Name: "sentinel_failovers_total", + Help: "Total number of Sentinel master failovers detected via +switch-master pub/sub.", + }) ) +// RecordSentinelFailover increments the sentinel_failovers_total counter. +// Called from main.go when a +switch-master event is received. +func RecordSentinelFailover() { + valkeySentinelFailovers.Inc() +} + func init() { metrics.Registry.MustRegister( valkeyACLProvisioned, @@ -70,5 +85,6 @@ func init() { valkeyACLErrors, valkeyACLSelfHealed, valkeyTenantsProvisioned, + valkeySentinelFailovers, ) } diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index fa38885..dc6dfc8 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -101,6 +101,37 @@ type NamespaceReconciler struct { ResyncPeriod time.Duration } +// TriggerResyncAll immediately re-queues all managed namespaces by updating a +// sync annotation. Called on Sentinel failover events to recover ACLs without +// waiting for the next periodic resync cycle. +func (r *NamespaceReconciler) TriggerResyncAll(ctx context.Context) { + log := logf.FromContext(ctx).WithName("valkey-resync") + nsList := &corev1.NamespaceList{} + if err := r.List(ctx, nsList); err != nil { + log.Error(err, "Failed to list namespaces for resync") + return + } + now := time.Now().UTC().Format(time.RFC3339) + count := 0 + for i := range nsList.Items { + ns := &nsList.Items[i] + if ns.Annotations[valkeyACLAnnotation] != "true" { + continue + } + patch := client.MergeFrom(ns.DeepCopy()) + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations["deco.sites/valkey-acl-sync"] = now + if err := r.Patch(ctx, ns, patch); err != nil { + log.Error(err, "Failed to patch namespace for resync", "namespace", ns.Name) + continue + } + count++ + } + log.Info("Triggered ACL resync on all managed namespaces", "count", count) +} + // InitMetrics seeds the tenants_provisioned gauge from current cluster state. // Must be called after the cache is synced (i.e. inside a Runnable or after mgr.Start). func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error { diff --git a/internal/valkey/client.go b/internal/valkey/client.go index 45b5c9a..fab58e6 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "strings" + "time" "github.com/redis/go-redis/v9" + "sigs.k8s.io/controller-runtime/pkg/log" ) // Client manages Valkey ACL users for tenant isolation. @@ -33,6 +35,12 @@ type Client interface { DeleteUser(ctx context.Context, username string) error // UserExists checks whether a Valkey ACL user exists on the master. UserExists(ctx context.Context, username string) (bool, error) + // WatchFailover subscribes to Sentinel "+switch-master" events and calls + // onFailover whenever a new master is elected. It reconnects automatically + // on disconnect and returns only when ctx is cancelled. It is a no-op for + // non-Sentinel clients. Errors are non-fatal — if Sentinel is unreachable + // the operator continues with its periodic resync. + WatchFailover(ctx context.Context, onFailover func()) error // Close releases the underlying connection. Close() error } @@ -121,6 +129,66 @@ func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, return false, fmt.Errorf("ACL GETUSER %s: %w", username, err) } +// WatchFailover subscribes to the Sentinel "+switch-master" pub/sub channel. +// It calls onFailover whenever a master election is detected, then reconnects +// automatically. Safe to call even if Sentinel is temporarily unavailable — +// errors are logged and retried with backoff, never propagated to the caller. +func (c *sentinelClient) WatchFailover(ctx context.Context, onFailover func()) error { + if c.sentinel == nil { + return nil // direct client — no Sentinel to watch + } + logger := log.FromContext(ctx).WithName("valkey-failover-watch") + go func() { + backoff := 2 * time.Second + for { + if err := ctx.Err(); err != nil { + return + } + if err := c.subscribeOnce(ctx, onFailover); err != nil { + logger.Error(err, "Sentinel pub/sub disconnected, retrying", "backoff", backoff) + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + if backoff < 2*time.Minute { + backoff *= 2 + } + } + } else { + backoff = 2 * time.Second // reset on clean exit + } + } + }() + return nil +} + +// subscribeOnce opens one pub/sub session and blocks until the connection drops +// or ctx is cancelled. It calls onFailover for each +switch-master event received. +func (c *sentinelClient) subscribeOnce(ctx context.Context, onFailover func()) error { + logger := log.FromContext(ctx).WithName("valkey-failover-watch") + ps := c.sentinel.Subscribe(ctx, "+switch-master") + defer func() { _ = ps.Close() }() + logger.Info("Subscribed to Sentinel +switch-master events") + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ps.Channel(): + if !ok { + return fmt.Errorf("channel closed") + } + logger.Info("Sentinel failover detected, triggering ACL resync", + "event", msg.Payload) + onFailover() + } + } +} + +// FailoverEventHook is called by WatchFailover on each detected failover event. +// Used by main.go to increment the sentinel_failovers_total metric without +// creating a dependency between the valkey and controller packages. +type FailoverEventHook func() + // Close closes all underlying connections. func (c *sentinelClient) Close() error { if c.sentinel != nil { @@ -186,9 +254,8 @@ func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Clie // configuration is absent (e.g., local development or auth not yet enabled). type NoopClient struct{} -func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } -func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } -func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { - return false, nil -} -func (NoopClient) Close() error { return nil } +func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } +func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } +func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil } +func (NoopClient) WatchFailover(_ context.Context, _ func()) error { return nil } +func (NoopClient) Close() error { return nil } From a576c8ec954dac95b8f4e7c8ea4b2d1fa7bb106c Mon Sep 17 00:00:00 2001 From: decobot Date: Sun, 12 Apr 2026 22:40:20 -0300 Subject: [PATCH 14/17] fix: goconst and gofmt lint errors Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 9 ++++++--- internal/valkey/client.go | 10 +++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index dc6dfc8..3cdc188 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -60,6 +60,9 @@ const ( // VALKEY_ACL_RESYNC_PERIOD (e.g. "10m", "30m", "1h"). const DefaultResyncPeriod = 10 * time.Minute +// valkeyACLAnnotationValue is the expected value of the opt-in annotation. +const valkeyACLAnnotationValue = "true" + // NamespaceReconciler provisions per-tenant Valkey ACL credentials for site namespaces. // When a Namespace has the annotation "deco.sites/valkey-acl: true", the reconciler: // - Creates a Valkey ACL user restricted to the site's key prefix. @@ -115,7 +118,7 @@ func (r *NamespaceReconciler) TriggerResyncAll(ctx context.Context) { count := 0 for i := range nsList.Items { ns := &nsList.Items[i] - if ns.Annotations[valkeyACLAnnotation] != "true" { + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { continue } patch := client.MergeFrom(ns.DeepCopy()) @@ -141,7 +144,7 @@ func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error { } count := 0.0 for _, ns := range nsList.Items { - if ns.Annotations[valkeyACLAnnotation] != "true" { + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { continue } secret := &corev1.Secret{} @@ -189,7 +192,7 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // Only process namespaces with the opt-in annotation. - if ns.Annotations[valkeyACLAnnotation] != "true" { + if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue { return ctrl.Result{}, nil } diff --git a/internal/valkey/client.go b/internal/valkey/client.go index fab58e6..7b1f1ad 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -254,8 +254,8 @@ func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Clie // configuration is absent (e.g., local development or auth not yet enabled). type NoopClient struct{} -func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } -func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } -func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil } -func (NoopClient) WatchFailover(_ context.Context, _ func()) error { return nil } -func (NoopClient) Close() error { return nil } +func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil } +func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil } +func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil } +func (NoopClient) WatchFailover(_ context.Context, _ func()) error { return nil } +func (NoopClient) Close() error { return nil } From 491fdd84f1858011e4b2efb69105add68e147a2c Mon Sep 17 00:00:00 2001 From: decobot Date: Mon, 13 Apr 2026 09:23:38 -0300 Subject: [PATCH 15/17] fix: restrict Sentinel failover watcher to leader pod only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit controller-runtime starts Runnables on all replicas by default. leaderElectedRunnable implements NeedLeaderElection() so only the active leader subscribes to +switch-master — prevents redundant TriggerResyncAll calls from non-leader replicas with multiple operator pods. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- cmd/main.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ea6c318..7128821 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -267,14 +267,16 @@ func main() { os.Exit(1) } // Start Sentinel failover watcher if enabled and Sentinel is configured. + // leaderElectedRunnable ensures only the active leader subscribes — prevents + // redundant TriggerResyncAll calls from non-leader replicas. // Fail-safe: if subscription fails, operator continues with periodic resync. if valkeyWatchFailover && valkeySentinelURLs != "" { - if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + if err := mgr.Add(&leaderElectedRunnable{fn: func(ctx context.Context) error { return valkeyClient.WatchFailover(ctx, func() { controller.RecordSentinelFailover() nsReconciler.TriggerResyncAll(ctx) }) - })); err != nil { + }}); err != nil { setupLog.Error(err, "unable to add Sentinel failover watcher (non-fatal)") } else { setupLog.Info("Sentinel failover watcher enabled") @@ -348,6 +350,21 @@ func main() { } } +// leaderElectedRunnable wraps a function so it only runs on the active leader. +// controller-runtime starts Runnables on all replicas by default; implementing +// NeedLeaderElection() restricts execution to the leader pod. +type leaderElectedRunnable struct { + fn func(ctx context.Context) error +} + +func (r *leaderElectedRunnable) Start(ctx context.Context) error { + return r.fn(ctx) +} + +func (r *leaderElectedRunnable) NeedLeaderElection() bool { + return true +} + func parseDuration(s string, fallback time.Duration) time.Duration { if s == "" { return fallback From 174760ac2f0bc5e4410f3821cd099b73f386387c Mon Sep 17 00:00:00 2001 From: decobot Date: Mon, 13 Apr 2026 09:46:33 -0300 Subject: [PATCH 16/17] fix: address code review findings - forEachReplica: try all sentinel addresses with fallback instead of only SentinelAddrs[0], collect all replica errors with errors.Join, log each failure explicitly - namespace_controller: validate password non-empty before UpsertUser to prevent ACL reset with invalid credentials - namespace_controller: restore valkeyACLSelfHealed increment by checking UserExists before UpsertUser in the periodic sync path - Remove unused sentinel field from sentinelClient struct Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/controller/namespace_controller.go | 16 +++++- internal/valkey/client.go | 64 ++++++++++++++------- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index 3cdc188..d701fe8 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -270,11 +270,25 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // - Replica replacement or restart // - Sentinel failover (new master starts without ACLs) password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"]) + if password == "" { + log.Error(nil, "valkey-acl Secret has empty password, skipping ACL sync", "namespace", ns.Name) + return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil + } + existed, checkErr := r.ValkeyClient.UserExists(ctx, siteName) + if checkErr != nil { + valkeyACLErrors.WithLabelValues("check").Inc() + return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr) + } if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil { valkeyACLErrors.WithLabelValues("upsert").Inc() return ctrl.Result{}, fmt.Errorf("sync Valkey user: %w", upsertErr) } - log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName) + if !existed { + valkeyACLSelfHealed.Inc() + log.Info("Valkey ACL user re-provisioned (self-heal)", "user", siteName) + } else { + log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName) + } } // Requeue periodically to sync ACLs to all Valkey nodes. diff --git a/internal/valkey/client.go b/internal/valkey/client.go index 7b1f1ad..04ff9d6 100644 --- a/internal/valkey/client.go +++ b/internal/valkey/client.go @@ -60,9 +60,7 @@ type Config struct { type sentinelClient struct { // master handles writes and ACL operations via Sentinel leader election. master *redis.Client - // sentinel is used to discover all replica addresses. - sentinel *redis.SentinelClient - cfg Config + cfg Config } // NewDirectClient returns a Client with a direct connection to a single Valkey instance. @@ -83,11 +81,7 @@ func NewSentinelClient(cfg Config) Client { SentinelAddrs: cfg.SentinelAddrs, Password: cfg.AdminPassword, }) - sentinel := redis.NewSentinelClient(&redis.Options{ - Addr: cfg.SentinelAddrs[0], - Password: cfg.AdminPassword, - }) - return &sentinelClient{master: master, sentinel: sentinel, cfg: cfg} + return &sentinelClient{master: master, cfg: cfg} } // UpsertUser provisions a per-tenant ACL user on the master and all replicas. @@ -134,7 +128,7 @@ func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, // automatically. Safe to call even if Sentinel is temporarily unavailable — // errors are logged and retried with backoff, never propagated to the caller. func (c *sentinelClient) WatchFailover(ctx context.Context, onFailover func()) error { - if c.sentinel == nil { + if len(c.cfg.SentinelAddrs) == 0 { return nil // direct client — no Sentinel to watch } logger := log.FromContext(ctx).WithName("valkey-failover-watch") @@ -166,7 +160,13 @@ func (c *sentinelClient) WatchFailover(ctx context.Context, onFailover func()) e // or ctx is cancelled. It calls onFailover for each +switch-master event received. func (c *sentinelClient) subscribeOnce(ctx context.Context, onFailover func()) error { logger := log.FromContext(ctx).WithName("valkey-failover-watch") - ps := c.sentinel.Subscribe(ctx, "+switch-master") + // Connect to the first reachable sentinel for pub/sub. + sc := redis.NewSentinelClient(&redis.Options{ + Addr: c.cfg.SentinelAddrs[0], + Password: c.cfg.AdminPassword, + }) + ps := sc.Subscribe(ctx, "+switch-master") + defer func() { _ = sc.Close(); _ = ps.Close() }() defer func() { _ = ps.Close() }() logger.Info("Subscribed to Sentinel +switch-master events") for { @@ -191,9 +191,6 @@ type FailoverEventHook func() // Close closes all underlying connections. func (c *sentinelClient) Close() error { - if c.sentinel != nil { - _ = c.sentinel.Close() - } return c.master.Close() } @@ -225,29 +222,52 @@ func (c *sentinelClient) rdo(ctx context.Context, rdb *redis.Client, args ...int } // forEachReplica discovers all current replicas via Sentinel and runs fn on each. -// Errors from individual replicas are logged but do not abort the loop — a -// best-effort approach ensures a single unreachable replica doesn't block provisioning. +// It tries each sentinel address until one responds, avoiding a single point of +// failure. All replica errors are collected and logged; provisioning continues +// even if individual replicas are unreachable. func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Client) error) error { - if c.sentinel == nil { + if len(c.cfg.SentinelAddrs) == 0 { return nil // direct client — no replicas to discover } - replicas, err := c.sentinel.Replicas(ctx, c.cfg.MasterName).Result() + logger := log.FromContext(ctx) + + replicas, err := c.sentinelReplicas(ctx) if err != nil { - return fmt.Errorf("sentinel replicas: %w", err) + return fmt.Errorf("discover replicas: %w", err) } - var lastErr error + + var errs []error for _, r := range replicas { addr := r["ip"] + ":" + r["port"] rdb := redis.NewClient(&redis.Options{ Addr: addr, Password: c.cfg.AdminPassword, }) - if err := fn(rdb); err != nil { - lastErr = err + if fnErr := fn(rdb); fnErr != nil { + logger.Error(fnErr, "ACL operation failed on replica", "addr", addr) + errs = append(errs, fmt.Errorf("replica %s: %w", addr, fnErr)) } _ = rdb.Close() } - return lastErr + return errors.Join(errs...) +} + +// sentinelReplicas queries each sentinel address until one returns the replica list. +func (c *sentinelClient) sentinelReplicas(ctx context.Context) ([]map[string]string, error) { + var lastErr error + for _, addr := range c.cfg.SentinelAddrs { + sc := redis.NewSentinelClient(&redis.Options{ + Addr: addr, + Password: c.cfg.AdminPassword, + }) + replicas, err := sc.Replicas(ctx, c.cfg.MasterName).Result() + _ = sc.Close() + if err == nil { + return replicas, nil + } + lastErr = err + } + return nil, fmt.Errorf("all sentinels unreachable: %w", lastErr) } // NoopClient is a Client implementation that does nothing, used when Valkey From 588fee9208535bf32ec8a6ae28a62d172d591af6 Mon Sep 17 00:00:00 2001 From: decobot Date: Mon, 13 Apr 2026 09:54:13 -0300 Subject: [PATCH 17/17] fix: remaining code review issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - InitMetrics: propagate non-NotFound errors instead of silently ignoring them — prevents incorrect gauge seeding on API failures - RBAC: reduce Secrets permissions to get;list;watch;create — delete and patch are not used by any controller today - Helm: support VALKEY_ADMIN_PASSWORD via existingSecret/existingSecretKey (secretKeyRef) to avoid credentials in plaintext manifests and Helm release history; inline adminPassword still available for dev/testing Co-Authored-By: Claude Sonnet 4.6 (1M context) --- chart/templates/clusterrole-operator-manager-role.yaml | 10 +++++++++- .../deployment-operator-controller-manager.yaml | 8 +++++++- chart/values.yaml | 4 ++++ config/rbac/role.yaml | 10 +++++++++- hack/helm-generator/main.go | 8 +++++++- internal/controller/namespace_controller.go | 10 +++++++--- 6 files changed, 43 insertions(+), 7 deletions(-) diff --git a/chart/templates/clusterrole-operator-manager-role.yaml b/chart/templates/clusterrole-operator-manager-role.yaml index 8551f75..5a1777f 100644 --- a/chart/templates/clusterrole-operator-manager-role.yaml +++ b/chart/templates/clusterrole-operator-manager-role.yaml @@ -7,7 +7,6 @@ rules: - "" resources: - configmaps - - secrets verbs: - create - delete @@ -34,6 +33,15 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - create + - get + - list + - watch - apiGroups: - deco.sites resources: diff --git a/chart/templates/deployment-operator-controller-manager.yaml b/chart/templates/deployment-operator-controller-manager.yaml index 9b7d104..3203945 100644 --- a/chart/templates/deployment-operator-controller-manager.yaml +++ b/chart/templates/deployment-operator-controller-manager.yaml @@ -42,7 +42,13 @@ spec: value: {{ .Values.valkey.sentinelUrls | quote }} - name: VALKEY_SENTINEL_MASTER_NAME value: {{ .Values.valkey.sentinelMasterName | quote }} - {{- if .Values.valkey.adminPassword }} + {{- if .Values.valkey.existingSecret }} + - name: VALKEY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.valkey.existingSecret | quote }} + key: {{ .Values.valkey.existingSecretKey | quote }} + {{- else if .Values.valkey.adminPassword }} - name: VALKEY_ADMIN_PASSWORD value: {{ .Values.valkey.adminPassword | quote }} {{- end }} diff --git a/chart/values.yaml b/chart/values.yaml index 9719e79..26a219e 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -22,7 +22,11 @@ github: valkey: sentinelUrls: "" sentinelMasterName: "mymaster" + # adminPassword: inline value (not recommended for production). + # Use existingSecret + existingSecretKey to source from a K8s Secret instead. adminPassword: "" + existingSecret: "" + existingSecretKey: "password" # Resource limits and requests resources: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3e64d78..9b28df2 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -8,7 +8,6 @@ rules: - "" resources: - configmaps - - secrets verbs: - create - delete @@ -35,6 +34,15 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - create + - get + - list + - watch - apiGroups: - deco.sites resources: diff --git a/hack/helm-generator/main.go b/hack/helm-generator/main.go index 2a6de91..26cbfb8 100644 --- a/hack/helm-generator/main.go +++ b/hack/helm-generator/main.go @@ -198,7 +198,13 @@ func addEnvVarsToDeployment(templatesDir string) error { value: {{ .Values.valkey.sentinelUrls | quote }} - name: VALKEY_SENTINEL_MASTER_NAME value: {{ .Values.valkey.sentinelMasterName | quote }} - {{- if .Values.valkey.adminPassword }} + {{- if .Values.valkey.existingSecret }} + - name: VALKEY_ADMIN_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.valkey.existingSecret | quote }} + key: {{ .Values.valkey.existingSecretKey | quote }} + {{- else if .Values.valkey.adminPassword }} - name: VALKEY_ADMIN_PASSWORD value: {{ .Values.valkey.adminPassword | quote }} {{- end }} diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go index d701fe8..ccdef03 100644 --- a/internal/controller/namespace_controller.go +++ b/internal/controller/namespace_controller.go @@ -52,7 +52,7 @@ const ( ) // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create // +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;update;patch // DefaultResyncPeriod is the default interval at which the reconciler re-syncs @@ -148,9 +148,13 @@ func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error { continue } secret := &corev1.Secret{} - if err := r.Get(ctx, types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}, secret); err == nil { - count++ + if err := r.Get(ctx, types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}, secret); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("reading secret in %s: %w", ns.Name, err) + } + continue } + count++ } valkeyTenantsProvisioned.Set(count) return nil