diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md
index d42c98b..ef6038d 100644
--- a/ARCHITECTURE.md
+++ b/ARCHITECTURE.md
@@ -494,6 +494,101 @@ graph TB
Helm --> K8s
```
+---
+
+## Valkey ACL Provisioning
+
+Per-tenant cache isolation enforced at the Valkey protocol level.
+Each site namespace gets a dedicated ACL user restricted to `~{site}:*` key patterns.
+
+### Flow: Initial Provisioning
+
+```mermaid
+sequenceDiagram
+ participant GitOps as GitOps / Bootstrap
+ participant K8s as Kubernetes API
+ participant Ctrl as NamespaceReconciler
+ participant Valkey as Valkey (all nodes)
+ participant Webhook as Mutating Webhook
+ participant Pod as Site Pod
+
+ GitOps->>K8s: kubectl annotate ns sites-loja
deco.sites/valkey-acl=true
+ K8s->>Ctrl: Reconcile Event
+ Ctrl->>Ctrl: Generate random password
+ Ctrl->>Valkey: ACL SETUSER loja on >pass ~loja:* ~lock:loja:*
(master + all replicas via Sentinel discovery)
+ Ctrl->>K8s: Create Secret "valkey-acl" in sites-loja
LOADER_CACHE_REDIS_USERNAME=loja
LOADER_CACHE_REDIS_PASSWORD=
+ Ctrl->>K8s: Patch ksvc spec.template.annotations
(triggers new Knative Revision)
+ K8s->>Webhook: Intercept ksvc update
+ Webhook->>K8s: Inject envFrom: secretRef: valkey-acl (optional=true)
+ K8s->>Pod: New pod starts with credentials
+ Pod->>Valkey: AUTH loja → reads/writes only ~loja:*
+```
+
+### Flow: Sentinel Failover Recovery
+
+```mermaid
+sequenceDiagram
+ participant Sentinel as Valkey Sentinel
+ participant Ctrl as NamespaceReconciler
+ participant PubSub as +switch-master channel
+ participant Valkey as New Master + Replicas
+
+ Note over Sentinel: Master fails
+ Sentinel->>Sentinel: Elect new master (quorum)
+ Sentinel->>PubSub: Publish +switch-master event
+ PubSub->>Ctrl: WatchFailover() receives event
+ Ctrl->>Ctrl: RecordSentinelFailover() metric++
+ Ctrl->>Ctrl: TriggerResyncAll() — patch all annotated namespaces
+ loop For each managed namespace
+ Ctrl->>Valkey: ACL SETUSER (master + all replicas)
+ end
+ Note over Ctrl: Recovery in seconds, not minutes
+```
+
+### ACL Replication Caveat
+
+Valkey does **not** replicate `ACL SETUSER` commands to replicas.
+The operator handles this by running ACL commands on every node individually:
+
+```
+UpsertUser(ctx, username, password)
+ ├── ACL SETUSER on master (via Sentinel FailoverClient)
+ ├── ACL SETUSER on replica-1 (direct connection, discovered via SENTINEL REPLICAS)
+ └── ACL SETUSER on replica-N
+```
+
+This ensures all nodes (master and read replicas) are always in sync,
+which is required because site pods use a separate read-replica endpoint.
+
+### Periodic Resync
+
+The reconciler requeues every namespace on a configurable interval (default: 10min)
+to ensure ACLs stay in sync after any undetected node restart.
+
+```
+VALKEY_ACL_RESYNC_PERIOD=10m # env var or --valkey-acl-resync-period flag
+```
+
+To force immediate resync of all managed namespaces:
+
+```bash
+kubectl annotate ns -l deco.sites/valkey-acl=true \
+ deco.sites/valkey-acl-sync=$(date +%s) --overwrite
+```
+
+### Metrics
+
+| Metric | Type | Description |
+|--------|------|-------------|
+| `deco_operator_valkey_acl_provisioned_total` | Counter | ACL users provisioned |
+| `deco_operator_valkey_acl_deleted_total` | Counter | ACL users deleted on namespace removal |
+| `deco_operator_valkey_acl_errors_total{operation}` | Counter | Errors by operation (upsert/delete/check) |
+| `deco_operator_valkey_acl_self_healed_total` | Counter | Re-provisions after ACL loss |
+| `deco_operator_valkey_tenants_provisioned` | Gauge | Current provisioned tenants (seeded on startup) |
+| `deco_operator_valkey_sentinel_failovers_total` | Counter | Sentinel +switch-master events detected |
+
+---
+
## Summary
The Deco CMS Operator provides:
@@ -504,7 +599,6 @@ The Deco CMS Operator provides:
4. **Reliable Notifications**: Parallel, time-bounded, with retries
5. **Deterministic Updates**: Timestamp-based verification
6. **Trackable Rollouts**: Watch conditions with commit/timestamp
-7. **Production Ready**: Handles scale, failures, and edge cases
-
-**All with zero configuration required from users!** 🚀
+7. **Per-tenant Cache Isolation**: Valkey ACL per site, enforced at protocol level
+8. **Production Ready**: Handles scale, failures, and edge cases
diff --git a/chart/templates/clusterrole-operator-manager-role.yaml b/chart/templates/clusterrole-operator-manager-role.yaml
index 993fb50..5a1777f 100644
--- a/chart/templates/clusterrole-operator-manager-role.yaml
+++ b/chart/templates/clusterrole-operator-manager-role.yaml
@@ -15,12 +15,30 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - ""
+ resources:
+ - namespaces
+ verbs:
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- ""
resources:
- pods
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ""
+ resources:
- secrets
verbs:
+ - create
- get
- list
- watch
@@ -57,4 +75,6 @@ rules:
verbs:
- get
- list
+ - patch
+ - update
- watch
\ No newline at end of file
diff --git a/chart/templates/deployment-operator-controller-manager.yaml b/chart/templates/deployment-operator-controller-manager.yaml
index 466d680..3203945 100644
--- a/chart/templates/deployment-operator-controller-manager.yaml
+++ b/chart/templates/deployment-operator-controller-manager.yaml
@@ -31,11 +31,29 @@ spec:
command:
- /manager
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
- {{- if .Values.github.token }}
+ {{- if or .Values.github.token .Values.valkey.sentinelUrls }}
env:
+ {{- if .Values.github.token }}
- name: GITHUB_TOKEN
value: {{ .Values.github.token | quote }}
{{- end }}
+ {{- if .Values.valkey.sentinelUrls }}
+ - name: VALKEY_SENTINEL_URLS
+ value: {{ .Values.valkey.sentinelUrls | quote }}
+ - name: VALKEY_SENTINEL_MASTER_NAME
+ value: {{ .Values.valkey.sentinelMasterName | quote }}
+ {{- if .Values.valkey.existingSecret }}
+ - name: VALKEY_ADMIN_PASSWORD
+ valueFrom:
+ secretKeyRef:
+ name: {{ .Values.valkey.existingSecret | quote }}
+ key: {{ .Values.valkey.existingSecretKey | quote }}
+ {{- else if .Values.valkey.adminPassword }}
+ - name: VALKEY_ADMIN_PASSWORD
+ value: {{ .Values.valkey.adminPassword | quote }}
+ {{- end }}
+ {{- end }}
+ {{- end }}
livenessProbe:
httpGet:
path: /healthz
diff --git a/chart/values.yaml b/chart/values.yaml
index 01746eb..26a219e 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -15,6 +15,19 @@ replicaCount: 1
github:
token: ""
+# Valkey (Redis) ACL provisioning
+# When sentinelUrls is set, the operator provisions per-tenant ACL users in Valkey
+# for each namespace annotated with deco.sites/valkey-acl: "true".
+# Leave sentinelUrls empty to disable ACL provisioning (NoopClient).
+valkey:
+ sentinelUrls: ""
+ sentinelMasterName: "mymaster"
+ # adminPassword: inline value (not recommended for production).
+ # Use existingSecret + existingSecretKey to source from a K8s Secret instead.
+ adminPassword: ""
+ existingSecret: ""
+ existingSecretKey: "password"
+
# Resource limits and requests
resources:
limits:
diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go
new file mode 100644
index 0000000..8248e9c
--- /dev/null
+++ b/cmd/bootstrap/main.go
@@ -0,0 +1,118 @@
+/*
+Copyright 2025.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// bootstrap is a one-shot CLI that annotates existing site Namespaces so the
+// NamespaceReconciler picks them up and provisions Valkey ACL credentials.
+//
+// Usage:
+//
+// go run ./cmd/bootstrap --namespace-pattern sites-
+// go run ./cmd/bootstrap --namespace-pattern sites- --dry-run
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "os"
+ "strings"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
+ _ "k8s.io/client-go/plugin/pkg/client/auth"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/log/zap"
+)
+
+const valkeyACLAnnotation = "deco.sites/valkey-acl"
+
+func main() {
+ var namespacePattern string
+ var dryRun bool
+
+ flag.StringVar(&namespacePattern, "namespace-pattern", "sites-",
+ "Prefix used to filter Namespaces eligible for Valkey ACL provisioning.")
+ flag.BoolVar(&dryRun, "dry-run", false,
+ "Print which Namespaces would be annotated without making changes.")
+
+ opts := zap.Options{Development: true}
+ opts.BindFlags(flag.CommandLine)
+ flag.Parse()
+
+ ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
+ log := ctrl.Log.WithName("bootstrap")
+
+ scheme := runtime.NewScheme()
+ utilruntime.Must(clientgoscheme.AddToScheme(scheme))
+
+ cfg, err := ctrl.GetConfig()
+ if err != nil {
+ log.Error(err, "Failed to get kubeconfig")
+ os.Exit(1)
+ }
+
+ k8s, err := client.New(cfg, client.Options{Scheme: scheme})
+ if err != nil {
+ log.Error(err, "Failed to create Kubernetes client")
+ os.Exit(1)
+ }
+
+ ctx := context.Background()
+
+ nsList := &corev1.NamespaceList{}
+ if err := k8s.List(ctx, nsList); err != nil {
+ log.Error(err, "Failed to list Namespaces")
+ os.Exit(1)
+ }
+
+ annotated, skipped := 0, 0
+ for i := range nsList.Items {
+ ns := &nsList.Items[i]
+
+ if !strings.HasPrefix(ns.Name, namespacePattern) {
+ continue
+ }
+
+ if ns.Annotations[valkeyACLAnnotation] == "true" {
+ log.Info("Already annotated, skipping", "namespace", ns.Name)
+ skipped++
+ continue
+ }
+
+ if dryRun {
+ fmt.Printf("[dry-run] would annotate namespace %s\n", ns.Name)
+ annotated++
+ continue
+ }
+
+ patch := client.MergeFrom(ns.DeepCopy())
+ if ns.Annotations == nil {
+ ns.Annotations = make(map[string]string)
+ }
+ ns.Annotations[valkeyACLAnnotation] = "true"
+ if err := k8s.Patch(ctx, ns, patch); err != nil {
+ log.Error(err, "Failed to annotate namespace", "namespace", ns.Name)
+ continue
+ }
+ log.Info("Annotated namespace", "namespace", ns.Name)
+ annotated++
+ }
+
+ log.Info("Bootstrap complete", "annotated", annotated, "skipped", skipped)
+}
diff --git a/cmd/main.go b/cmd/main.go
index 6328064..7128821 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -17,10 +17,14 @@ limitations under the License.
package main
import (
+ "context"
"crypto/tls"
"flag"
+ "fmt"
"os"
"path/filepath"
+ "strings"
+ "time"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
@@ -33,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
@@ -41,6 +46,7 @@ import (
decositesv1alpha1 "github.com/deco-sites/decofile-operator/api/v1alpha1"
"github.com/deco-sites/decofile-operator/internal/controller"
+ "github.com/deco-sites/decofile-operator/internal/valkey"
webhookv1 "github.com/deco-sites/decofile-operator/internal/webhook/v1"
// +kubebuilder:scaffold:imports
)
@@ -68,6 +74,12 @@ func main() {
var secureMetrics bool
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)
+ var valkeyURL string
+ var valkeySentinelURLs string
+ var valkeySentinelMaster string
+ var valkeyAdminPassword string
+ var valkeyResyncPeriod time.Duration
+ var valkeyWatchFailover bool
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -85,6 +97,22 @@ func main() {
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
+ flag.StringVar(&valkeyURL, "valkey-url", os.Getenv("VALKEY_URL"),
+ "Direct Valkey address (host:port). Takes precedence over sentinel. For local dev only.")
+ flag.StringVar(&valkeySentinelURLs, "valkey-sentinel-urls", os.Getenv("VALKEY_SENTINEL_URLS"),
+ "Comma-separated list of Valkey Sentinel addresses (host:port). When empty, ACL provisioning is disabled.")
+ flag.StringVar(&valkeySentinelMaster, "valkey-sentinel-master",
+ getEnvOrDefault("VALKEY_SENTINEL_MASTER_NAME", "mymaster"),
+ "Valkey Sentinel master name.")
+ flag.StringVar(&valkeyAdminPassword, "valkey-admin-password", os.Getenv("VALKEY_ADMIN_PASSWORD"),
+ "Password for the Valkey admin user used to manage ACLs.")
+ flag.DurationVar(&valkeyResyncPeriod, "valkey-acl-resync-period",
+ parseDuration(os.Getenv("VALKEY_ACL_RESYNC_PERIOD"), controller.DefaultResyncPeriod),
+ "How often to re-sync ACL users to all Valkey nodes (e.g. 10m, 30m, 1h).")
+ flag.BoolVar(&valkeyWatchFailover, "valkey-watch-failover",
+ os.Getenv("VALKEY_WATCH_FAILOVER") != "false",
+ "Subscribe to Sentinel +switch-master events and trigger immediate ACL resync on failover. "+
+ "Enabled by default when VALKEY_SENTINEL_URLS is set. Set VALKEY_WATCH_FAILOVER=false to disable.")
opts := zap.Options{
Development: true,
}
@@ -206,6 +234,66 @@ func main() {
os.Exit(1)
}
+ // Build Valkey client. Falls back to a no-op client when Sentinel URLs are not configured
+ // so the operator works in environments where Valkey ACL provisioning is not needed.
+ var valkeyClient valkey.Client
+ switch {
+ case valkeyURL != "":
+ valkeyClient = valkey.NewDirectClient(valkeyURL, valkeyAdminPassword)
+ defer func() { _ = valkeyClient.Close() }()
+ setupLog.Info("Valkey ACL provisioning enabled (direct)", "addr", valkeyURL)
+ case valkeySentinelURLs != "":
+ valkeyClient = valkey.NewSentinelClient(valkey.Config{
+ SentinelAddrs: strings.Split(valkeySentinelURLs, ","),
+ MasterName: valkeySentinelMaster,
+ AdminPassword: valkeyAdminPassword,
+ })
+ defer func() { _ = valkeyClient.Close() }()
+ setupLog.Info("Valkey ACL provisioning enabled (sentinel)",
+ "sentinel", valkeySentinelURLs, "master", valkeySentinelMaster)
+ default:
+ valkeyClient = valkey.NoopClient{}
+ setupLog.Info("Valkey ACL provisioning disabled (set VALKEY_URL or VALKEY_SENTINEL_URLS)")
+ }
+
+ nsReconciler := &controller.NamespaceReconciler{
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ ValkeyClient: valkeyClient,
+ ResyncPeriod: valkeyResyncPeriod,
+ }
+ if err := nsReconciler.SetupWithManager(mgr); err != nil {
+ setupLog.Error(err, "unable to create controller", "controller", "Namespace")
+ os.Exit(1)
+ }
+ // Start Sentinel failover watcher if enabled and Sentinel is configured.
+ // leaderElectedRunnable ensures only the active leader subscribes — prevents
+ // redundant TriggerResyncAll calls from non-leader replicas.
+ // Fail-safe: if subscription fails, operator continues with periodic resync.
+ if valkeyWatchFailover && valkeySentinelURLs != "" {
+ if err := mgr.Add(&leaderElectedRunnable{fn: func(ctx context.Context) error {
+ return valkeyClient.WatchFailover(ctx, func() {
+ controller.RecordSentinelFailover()
+ nsReconciler.TriggerResyncAll(ctx)
+ })
+ }}); err != nil {
+ setupLog.Error(err, "unable to add Sentinel failover watcher (non-fatal)")
+ } else {
+ setupLog.Info("Sentinel failover watcher enabled")
+ }
+ }
+
+ // Seed the tenants_provisioned gauge from current cluster state once the cache is warm.
+ if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
+ if !mgr.GetCache().WaitForCacheSync(ctx) {
+ return fmt.Errorf("cache never synced")
+ }
+ return nsReconciler.InitMetrics(ctx)
+ })); err != nil {
+ setupLog.Error(err, "unable to add metrics init runnable")
+ os.Exit(1)
+ }
+
// Create shared HTTP client for pod notifications to prevent connection leaks
httpClient := controller.NewHTTPClient()
@@ -261,3 +349,36 @@ func main() {
os.Exit(1)
}
}
+
+// leaderElectedRunnable wraps a function so it only runs on the active leader.
+// controller-runtime starts Runnables on all replicas by default; implementing
+// NeedLeaderElection() restricts execution to the leader pod.
+type leaderElectedRunnable struct {
+ fn func(ctx context.Context) error
+}
+
+func (r *leaderElectedRunnable) Start(ctx context.Context) error {
+ return r.fn(ctx)
+}
+
+func (r *leaderElectedRunnable) NeedLeaderElection() bool {
+ return true
+}
+
+func parseDuration(s string, fallback time.Duration) time.Duration {
+ if s == "" {
+ return fallback
+ }
+ d, err := time.ParseDuration(s)
+ if err != nil {
+ return fallback
+ }
+ return d
+}
+
+func getEnvOrDefault(key, defaultVal string) string {
+ if v := os.Getenv(key); v != "" {
+ return v
+ }
+ return defaultVal
+}
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index d547341..9b28df2 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -16,12 +16,30 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - ""
+ resources:
+ - namespaces
+ verbs:
+ - get
+ - list
+ - patch
+ - update
+ - watch
- apiGroups:
- ""
resources:
- pods
+ verbs:
+ - get
+ - list
+ - watch
+- apiGroups:
+ - ""
+ resources:
- secrets
verbs:
+ - create
- get
- list
- watch
@@ -58,4 +76,6 @@ rules:
verbs:
- get
- list
+ - patch
+ - update
- watch
diff --git a/go.mod b/go.mod
index c0b8155..7f73982 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -56,6 +57,7 @@ require (
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
+ github.com/redis/go-redis/v9 v9.18.0 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
@@ -69,6 +71,7 @@ require (
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
+ go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
diff --git a/go.sum b/go.sum
index ecfa09d..338285b 100644
--- a/go.sum
+++ b/go.sum
@@ -19,6 +19,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU=
github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls=
@@ -114,6 +116,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
+github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
+github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -162,6 +166,8 @@ go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42s
go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4=
go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
+go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
diff --git a/hack/helm-generator/main.go b/hack/helm-generator/main.go
index 493fac0..26cbfb8 100644
--- a/hack/helm-generator/main.go
+++ b/hack/helm-generator/main.go
@@ -83,9 +83,9 @@ func main() {
fileCount++
}
- // Add GITHUB_TOKEN to deployment
- if err := addGitHubTokenToDeployment(templatesDir); err != nil {
- fmt.Fprintf(os.Stderr, "Warning: Could not add GITHUB_TOKEN: %v\n", err)
+ // Add env vars to deployment
+ if err := addEnvVarsToDeployment(templatesDir); err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Could not add env vars to deployment: %v\n", err)
}
fmt.Printf("✓ Generated %d Helm templates\n\n", fileCount)
@@ -172,7 +172,7 @@ func addConditionals(content, kind string) string {
return content
}
-func addGitHubTokenToDeployment(templatesDir string) error {
+func addEnvVarsToDeployment(templatesDir string) error {
files, err := filepath.Glob(filepath.Join(templatesDir, "deployment-*.yaml"))
if err != nil || len(files) == 0 {
return fmt.Errorf("no deployment file found")
@@ -187,10 +187,28 @@ func addGitHubTokenToDeployment(templatesDir string) error {
contentStr := string(content)
// Find the image line and add env vars after it
- envBlock := ` {{- if .Values.github.token }}
+ envBlock := ` {{- if or .Values.github.token .Values.valkey.sentinelUrls }}
env:
+ {{- if .Values.github.token }}
- name: GITHUB_TOKEN
value: {{ .Values.github.token | quote }}
+ {{- end }}
+ {{- if .Values.valkey.sentinelUrls }}
+ - name: VALKEY_SENTINEL_URLS
+ value: {{ .Values.valkey.sentinelUrls | quote }}
+ - name: VALKEY_SENTINEL_MASTER_NAME
+ value: {{ .Values.valkey.sentinelMasterName | quote }}
+ {{- if .Values.valkey.existingSecret }}
+ - name: VALKEY_ADMIN_PASSWORD
+ valueFrom:
+ secretKeyRef:
+ name: {{ .Values.valkey.existingSecret | quote }}
+ key: {{ .Values.valkey.existingSecretKey | quote }}
+ {{- else if .Values.valkey.adminPassword }}
+ - name: VALKEY_ADMIN_PASSWORD
+ value: {{ .Values.valkey.adminPassword | quote }}
+ {{- end }}
+ {{- end }}
{{- end }}`
re := regexp.MustCompile(`(?m)( image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}")`)
diff --git a/internal/controller/metrics.go b/internal/controller/metrics.go
new file mode 100644
index 0000000..d0fdc95
--- /dev/null
+++ b/internal/controller/metrics.go
@@ -0,0 +1,90 @@
+/*
+Copyright 2025.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "sigs.k8s.io/controller-runtime/pkg/metrics"
+)
+
+var (
+ // valkeyACLProvisioned counts successful ACL user + Secret provisioning operations.
+ valkeyACLProvisioned = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "acl_provisioned_total",
+ Help: "Total number of Valkey ACL users provisioned (new or re-provisioned).",
+ })
+
+ // valkeyACLDeleted counts ACL user deletions triggered by namespace removal.
+ valkeyACLDeleted = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "acl_deleted_total",
+ Help: "Total number of Valkey ACL users deleted on namespace removal.",
+ })
+
+ // valkeyACLErrors counts failures when interacting with Valkey.
+ valkeyACLErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "acl_errors_total",
+ Help: "Total number of Valkey ACL operation errors by operation type.",
+ }, []string{"operation"}) // operation: upsert | delete | check
+
+ // valkeyACLSelfHealed counts how many times an ACL was re-created after Valkey restart.
+ valkeyACLSelfHealed = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "acl_self_healed_total",
+ Help: "Total number of Valkey ACL users re-provisioned after being lost (e.g. Valkey restart).",
+ })
+
+ // valkeyTenantsProvisioned tracks the current number of provisioned tenants (gauge).
+ valkeyTenantsProvisioned = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "tenants_provisioned",
+ Help: "Current number of site namespaces with a provisioned Valkey ACL user.",
+ })
+
+ // valkeySentinelFailovers counts Sentinel +switch-master events received.
+ // Each event triggers an immediate full ACL resync to all nodes.
+ valkeySentinelFailovers = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "deco_operator",
+ Subsystem: "valkey",
+ Name: "sentinel_failovers_total",
+ Help: "Total number of Sentinel master failovers detected via +switch-master pub/sub.",
+ })
+)
+
+// RecordSentinelFailover increments the sentinel_failovers_total counter.
+// Called from main.go when a +switch-master event is received.
+func RecordSentinelFailover() {
+ valkeySentinelFailovers.Inc()
+}
+
+func init() {
+ metrics.Registry.MustRegister(
+ valkeyACLProvisioned,
+ valkeyACLDeleted,
+ valkeyACLErrors,
+ valkeyACLSelfHealed,
+ valkeyTenantsProvisioned,
+ valkeySentinelFailovers,
+ )
+}
diff --git a/internal/controller/namespace_controller.go b/internal/controller/namespace_controller.go
new file mode 100644
index 0000000..ccdef03
--- /dev/null
+++ b/internal/controller/namespace_controller.go
@@ -0,0 +1,359 @@
+/*
+Copyright 2025.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/base64"
+ "fmt"
+ "strings"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+
+ servingknativedevv1 "knative.dev/serving/pkg/apis/serving/v1"
+
+ "github.com/deco-sites/decofile-operator/internal/valkey"
+)
+
+const (
+ valkeyACLAnnotation = "deco.sites/valkey-acl"
+ valkeyACLFinalizer = "deco.sites/valkey-acl"
+ valkeySecretName = "valkey-acl"
+ valkeyProvisionedAnnot = "deco.sites/valkey-acl-provisioned"
+
+ siteNamespacePrefix = "sites-"
+)
+
+// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;update;patch
+// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create
+// +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;update;patch
+
+// DefaultResyncPeriod is the default interval at which the reconciler re-syncs
+// ACL users to all Valkey nodes even when nothing changed. Configurable via
+// VALKEY_ACL_RESYNC_PERIOD (e.g. "10m", "30m", "1h").
+const DefaultResyncPeriod = 10 * time.Minute
+
+// valkeyACLAnnotationValue is the expected value of the opt-in annotation.
+const valkeyACLAnnotationValue = "true"
+
+// NamespaceReconciler provisions per-tenant Valkey ACL credentials for site namespaces.
+// When a Namespace has the annotation "deco.sites/valkey-acl: true", the reconciler:
+// - Creates a Valkey ACL user restricted to the site's key prefix.
+// - Creates a K8s Secret "valkey-acl" in that namespace with the credentials.
+// - Patches the Knative Service to trigger a new Revision that picks up the Secret.
+// - Cleans up the Valkey ACL user when the namespace is deleted.
+//
+// # What triggers a reconcile
+//
+// - Namespace created or annotated with deco.sites/valkey-acl: "true"
+// - Secret "valkey-acl" deleted in a managed namespace (via Watches)
+// - Periodic requeue (RequeueAfter: 10min) for self-healing after Valkey restarts
+//
+// To force an immediate full resync of all managed namespaces (e.g. after a Valkey
+// failover), touch all annotated namespaces to trigger reconcile on each:
+//
+// kubectl annotate ns -l deco.sites/valkey-acl=true \
+// deco.sites/valkey-acl-sync=$(date +%s) --overwrite
+//
+// # ACL replication caveat
+//
+// Valkey does NOT replicate ACL commands (ACL SETUSER/DELUSER) to replicas.
+// The operator runs ACL commands only on the current Sentinel master. This means:
+//
+// 1. After a Sentinel failover, the new master starts without per-tenant ACLs.
+// The periodic reconcile (10min) detects this and re-provisions all users.
+// During the recovery window, deco falls back to FILE_SYSTEM cache.
+//
+// 2. Read replicas used by pods (LOADER_CACHE_REDIS_READ_URL) also lack the
+// per-tenant ACL users. This is not enforced today (auth.enabled: false),
+// but MUST be addressed before enabling Valkey auth in production.
+//
+// TODO: when enabling auth, extend ValkeyClient to provision ACL SETUSER on
+// all nodes (master + every replica), not only the Sentinel master.
+type NamespaceReconciler struct {
+ client.Client
+ Scheme *runtime.Scheme
+ ValkeyClient valkey.Client
+ ResyncPeriod time.Duration
+}
+
+// TriggerResyncAll immediately re-queues all managed namespaces by updating a
+// sync annotation. Called on Sentinel failover events to recover ACLs without
+// waiting for the next periodic resync cycle.
+func (r *NamespaceReconciler) TriggerResyncAll(ctx context.Context) {
+ log := logf.FromContext(ctx).WithName("valkey-resync")
+ nsList := &corev1.NamespaceList{}
+ if err := r.List(ctx, nsList); err != nil {
+ log.Error(err, "Failed to list namespaces for resync")
+ return
+ }
+ now := time.Now().UTC().Format(time.RFC3339)
+ count := 0
+ for i := range nsList.Items {
+ ns := &nsList.Items[i]
+ if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue {
+ continue
+ }
+ patch := client.MergeFrom(ns.DeepCopy())
+ if ns.Annotations == nil {
+ ns.Annotations = make(map[string]string)
+ }
+ ns.Annotations["deco.sites/valkey-acl-sync"] = now
+ if err := r.Patch(ctx, ns, patch); err != nil {
+ log.Error(err, "Failed to patch namespace for resync", "namespace", ns.Name)
+ continue
+ }
+ count++
+ }
+ log.Info("Triggered ACL resync on all managed namespaces", "count", count)
+}
+
+// InitMetrics seeds the tenants_provisioned gauge from current cluster state.
+// Must be called after the cache is synced (i.e. inside a Runnable or after mgr.Start).
+func (r *NamespaceReconciler) InitMetrics(ctx context.Context) error {
+ nsList := &corev1.NamespaceList{}
+ if err := r.List(ctx, nsList); err != nil {
+ return err
+ }
+ count := 0.0
+ for _, ns := range nsList.Items {
+ if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue {
+ continue
+ }
+ secret := &corev1.Secret{}
+ if err := r.Get(ctx, types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}, secret); err != nil {
+ if !errors.IsNotFound(err) {
+ return fmt.Errorf("reading secret in %s: %w", ns.Name, err)
+ }
+ continue
+ }
+ count++
+ }
+ valkeyTenantsProvisioned.Set(count)
+ return nil
+}
+
+// SetupWithManager registers the Namespace controller with a resync period for
+// self-healing (recovers ACLs lost after a Valkey restart).
+func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ // Watch Secrets named "valkey-acl" and enqueue the parent Namespace.
+ // Namespace is cluster-scoped so Owns() (which relies on owner references) cannot
+ // be used across scopes. Instead we map Secret → Namespace by name.
+ secretToNamespace := handler.EnqueueRequestsFromMapFunc(
+ func(_ context.Context, obj client.Object) []reconcile.Request {
+ if obj.GetName() != valkeySecretName {
+ return nil
+ }
+ return []reconcile.Request{
+ {NamespacedName: types.NamespacedName{Name: obj.GetNamespace()}},
+ }
+ },
+ )
+
+ return ctrl.NewControllerManagedBy(mgr).
+ For(&corev1.Namespace{}).
+ Watches(&corev1.Secret{}, secretToNamespace).
+ WithOptions(controller.Options{
+ MaxConcurrentReconciles: 4,
+ }).
+ Named("namespace-valkey").
+ Complete(r)
+}
+
+func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+ log := logf.FromContext(ctx).WithValues("namespace", req.Name)
+
+ ns := &corev1.Namespace{}
+ if err := r.Get(ctx, req.NamespacedName, ns); err != nil {
+ return ctrl.Result{}, client.IgnoreNotFound(err)
+ }
+
+ // Only process namespaces with the opt-in annotation.
+ if ns.Annotations[valkeyACLAnnotation] != valkeyACLAnnotationValue {
+ return ctrl.Result{}, nil
+ }
+
+ siteName := siteNameFromNamespace(ns.Name)
+
+ // Handle deletion: remove the Valkey ACL user before the namespace is gone.
+ if !ns.DeletionTimestamp.IsZero() {
+ if controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) {
+ log.Info("Deleting Valkey ACL user", "user", siteName)
+ if err := r.ValkeyClient.DeleteUser(ctx, siteName); err != nil {
+ log.Error(err, "Failed to delete Valkey ACL user, will retry")
+ valkeyACLErrors.WithLabelValues("delete").Inc()
+ return ctrl.Result{}, err
+ }
+ valkeyACLDeleted.Inc()
+ valkeyTenantsProvisioned.Dec()
+ log.Info("Valkey ACL user deleted", "user", siteName)
+ controllerutil.RemoveFinalizer(ns, valkeyACLFinalizer)
+ if err := r.Update(ctx, ns); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+ return ctrl.Result{}, nil
+ }
+
+ // Ensure finalizer is present so we can clean up on deletion.
+ if !controllerutil.ContainsFinalizer(ns, valkeyACLFinalizer) {
+ controllerutil.AddFinalizer(ns, valkeyACLFinalizer)
+ if err := r.Update(ctx, ns); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+
+ // Check whether the credential Secret already exists.
+ secret := &corev1.Secret{}
+ secretKey := types.NamespacedName{Name: valkeySecretName, Namespace: ns.Name}
+ err := r.Get(ctx, secretKey, secret)
+
+ switch {
+ case errors.IsNotFound(err):
+ // Secret does not exist: generate credentials, create ACL user and Secret.
+ password, genErr := generatePassword()
+ if genErr != nil {
+ return ctrl.Result{}, fmt.Errorf("generate password: %w", genErr)
+ }
+
+ log.Info("Provisioning Valkey ACL user", "user", siteName)
+ if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil {
+ valkeyACLErrors.WithLabelValues("upsert").Inc()
+ return ctrl.Result{}, fmt.Errorf("upsert Valkey user: %w", upsertErr)
+ }
+
+ if createErr := r.createSecret(ctx, ns.Name, siteName, password); createErr != nil {
+ return ctrl.Result{}, fmt.Errorf("create secret: %w", createErr)
+ }
+
+ valkeyACLProvisioned.Inc()
+ valkeyTenantsProvisioned.Inc()
+ log.Info("Valkey ACL provisioned", "user", siteName, "namespace", ns.Name)
+
+ // Trigger a new Knative Revision so running pods pick up the new Secret.
+ if patchErr := r.patchKnativeServiceTimestamp(ctx, ns.Name); patchErr != nil {
+ log.Error(patchErr, "Failed to patch Knative Service (non-fatal)")
+ }
+
+ case err != nil:
+ return ctrl.Result{}, fmt.Errorf("get secret: %w", err)
+
+ default:
+ // Secret exists. Always call UpsertUser on the periodic reconcile — it is
+ // idempotent and ensures ACLs are present on ALL nodes (master + replicas).
+ // Valkey does not replicate ACL commands, so a single UpsertUser call from
+ // the operator is the only way to keep every node in sync. This also covers:
+ // - Valkey master restart (ACLs lost from memory)
+ // - Replica replacement or restart
+ // - Sentinel failover (new master starts without ACLs)
+ password := string(secret.Data["LOADER_CACHE_REDIS_PASSWORD"])
+ if password == "" {
+ log.Error(nil, "valkey-acl Secret has empty password, skipping ACL sync", "namespace", ns.Name)
+ return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil
+ }
+ existed, checkErr := r.ValkeyClient.UserExists(ctx, siteName)
+ if checkErr != nil {
+ valkeyACLErrors.WithLabelValues("check").Inc()
+ return ctrl.Result{}, fmt.Errorf("check Valkey user: %w", checkErr)
+ }
+ if upsertErr := r.ValkeyClient.UpsertUser(ctx, siteName, password); upsertErr != nil {
+ valkeyACLErrors.WithLabelValues("upsert").Inc()
+ return ctrl.Result{}, fmt.Errorf("sync Valkey user: %w", upsertErr)
+ }
+ if !existed {
+ valkeyACLSelfHealed.Inc()
+ log.Info("Valkey ACL user re-provisioned (self-heal)", "user", siteName)
+ } else {
+ log.V(1).Info("Valkey ACL synced to all nodes", "user", siteName)
+ }
+ }
+
+ // Requeue periodically to sync ACLs to all Valkey nodes.
+ return ctrl.Result{RequeueAfter: r.ResyncPeriod}, nil
+}
+
+// createSecret creates the "valkey-acl" Secret in the given namespace with
+// credentials ready to be consumed by deco via LOADER_CACHE_REDIS_USERNAME/PASSWORD.
+func (r *NamespaceReconciler) createSecret(ctx context.Context, namespace, username, password string) error {
+ secret := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: valkeySecretName,
+ Namespace: namespace,
+ },
+ StringData: map[string]string{
+ "LOADER_CACHE_REDIS_USERNAME": username,
+ "LOADER_CACHE_REDIS_PASSWORD": password,
+ },
+ }
+ return r.Create(ctx, secret)
+}
+
+// patchKnativeServiceTimestamp adds/updates the "deco.sites/valkey-acl-provisioned"
+// annotation on every Knative Service in the namespace. This causes Knative to create
+// a new Revision whose pods will mount the just-created valkey-acl Secret.
+func (r *NamespaceReconciler) patchKnativeServiceTimestamp(ctx context.Context, namespace string) error {
+ log := logf.FromContext(ctx)
+
+ svcList := &servingknativedevv1.ServiceList{}
+ if err := r.List(ctx, svcList, client.InNamespace(namespace)); err != nil {
+ return fmt.Errorf("list Knative Services: %w", err)
+ }
+
+ now := time.Now().UTC().Format(time.RFC3339)
+ for i := range svcList.Items {
+ svc := &svcList.Items[i]
+ patch := client.MergeFrom(svc.DeepCopy())
+ // Must annotate spec.template, not metadata — Knative only creates a new
+ // Revision when spec.template changes.
+ if svc.Spec.Template.Annotations == nil {
+ svc.Spec.Template.Annotations = make(map[string]string)
+ }
+ svc.Spec.Template.Annotations[valkeyProvisionedAnnot] = now
+ if err := r.Patch(ctx, svc, patch); err != nil {
+ log.Error(err, "Failed to patch Knative Service", "service", svc.Name)
+ }
+ }
+ return nil
+}
+
+// siteNameFromNamespace derives the Valkey ACL username from the K8s namespace name.
+// The "sites-" prefix is stripped when present so the username matches DECO_SITE_NAME.
+func siteNameFromNamespace(namespace string) string {
+ return strings.TrimPrefix(namespace, siteNamespacePrefix)
+}
+
+// generatePassword produces a cryptographically random 32-byte URL-safe base64 string.
+func generatePassword() (string, error) {
+ b := make([]byte, 32)
+ if _, err := rand.Read(b); err != nil {
+ return "", err
+ }
+ return base64.URLEncoding.EncodeToString(b), nil
+}
diff --git a/internal/valkey/client.go b/internal/valkey/client.go
new file mode 100644
index 0000000..04ff9d6
--- /dev/null
+++ b/internal/valkey/client.go
@@ -0,0 +1,281 @@
+/*
+Copyright 2025.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package valkey
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/redis/go-redis/v9"
+ "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+// Client manages Valkey ACL users for tenant isolation.
+type Client interface {
+ // UpsertUser creates or replaces a Valkey ACL user restricted to the given prefix.
+ UpsertUser(ctx context.Context, username, password string) error
+ // DeleteUser removes a Valkey ACL user.
+ DeleteUser(ctx context.Context, username string) error
+ // UserExists checks whether a Valkey ACL user exists on the master.
+ UserExists(ctx context.Context, username string) (bool, error)
+ // WatchFailover subscribes to Sentinel "+switch-master" events and calls
+ // onFailover whenever a new master is elected. It reconnects automatically
+ // on disconnect and returns only when ctx is cancelled. It is a no-op for
+ // non-Sentinel clients. Errors are non-fatal — if Sentinel is unreachable
+ // the operator continues with its periodic resync.
+ WatchFailover(ctx context.Context, onFailover func()) error
+ // Close releases the underlying connection.
+ Close() error
+}
+
+// Config holds the configuration required to connect to Valkey via Sentinel.
+type Config struct {
+ SentinelAddrs []string
+ MasterName string
+ AdminPassword string
+}
+
+// sentinelClient provisions ACL users on the Sentinel master AND on every
+// known replica. This is necessary because Valkey does not replicate ACL
+// commands — each node maintains its own independent ACL table. Without
+// provisioning on replicas, pods that connect via the read-replica endpoint
+// (LOADER_CACHE_REDIS_READ_URL) will fail authentication once auth is enabled.
+type sentinelClient struct {
+ // master handles writes and ACL operations via Sentinel leader election.
+ master *redis.Client
+ cfg Config
+}
+
+// NewDirectClient returns a Client with a direct connection to a single Valkey instance.
+// Intended for local development and testing — not for production use.
+func NewDirectClient(addr, password string) Client {
+ rdb := redis.NewClient(&redis.Options{
+ Addr: addr,
+ Password: password,
+ })
+ return &sentinelClient{master: rdb}
+}
+
+// NewSentinelClient returns a Client that provisions ACL users on the Sentinel
+// master and all replicas.
+func NewSentinelClient(cfg Config) Client {
+ master := redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: cfg.MasterName,
+ SentinelAddrs: cfg.SentinelAddrs,
+ Password: cfg.AdminPassword,
+ })
+ return &sentinelClient{master: master, cfg: cfg}
+}
+
+// UpsertUser provisions a per-tenant ACL user on the master and all replicas.
+func (c *sentinelClient) UpsertUser(ctx context.Context, username, password string) error {
+ if err := c.aclSetUser(ctx, c.master, username, password); err != nil {
+ return err
+ }
+ return c.forEachReplica(ctx, func(rdb *redis.Client) error {
+ return c.aclSetUser(ctx, rdb, username, password)
+ })
+}
+
+// DeleteUser removes a per-tenant ACL user from the master and all replicas.
+func (c *sentinelClient) DeleteUser(ctx context.Context, username string) error {
+ if err := c.rdo(ctx, c.master, "ACL", "DELUSER", username); err != nil {
+ return fmt.Errorf("ACL DELUSER %s on master: %w", username, err)
+ }
+ return c.forEachReplica(ctx, func(rdb *redis.Client) error {
+ if err := c.rdo(ctx, rdb, "ACL", "DELUSER", username); err != nil {
+ return fmt.Errorf("ACL DELUSER %s on replica: %w", username, err)
+ }
+ return nil
+ })
+}
+
+// UserExists checks for the presence of a Valkey ACL user on the master.
+// Replicas are not checked — if the master has the user, replicas should too.
+func (c *sentinelClient) UserExists(ctx context.Context, username string) (bool, error) {
+ err := c.master.Do(ctx, "ACL", "GETUSER", username).Err()
+ if err == nil {
+ return true, nil
+ }
+ if errors.Is(err, redis.Nil) {
+ return false, nil
+ }
+ if strings.Contains(err.Error(), "No such user") {
+ return false, nil
+ }
+ return false, fmt.Errorf("ACL GETUSER %s: %w", username, err)
+}
+
+// WatchFailover subscribes to the Sentinel "+switch-master" pub/sub channel.
+// It calls onFailover whenever a master election is detected, then reconnects
+// automatically. Safe to call even if Sentinel is temporarily unavailable —
+// errors are logged and retried with backoff, never propagated to the caller.
+func (c *sentinelClient) WatchFailover(ctx context.Context, onFailover func()) error {
+ if len(c.cfg.SentinelAddrs) == 0 {
+ return nil // direct client — no Sentinel to watch
+ }
+ logger := log.FromContext(ctx).WithName("valkey-failover-watch")
+ go func() {
+ backoff := 2 * time.Second
+ for {
+ if err := ctx.Err(); err != nil {
+ return
+ }
+ if err := c.subscribeOnce(ctx, onFailover); err != nil {
+ logger.Error(err, "Sentinel pub/sub disconnected, retrying", "backoff", backoff)
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(backoff):
+ if backoff < 2*time.Minute {
+ backoff *= 2
+ }
+ }
+ } else {
+ backoff = 2 * time.Second // reset on clean exit
+ }
+ }
+ }()
+ return nil
+}
+
+// subscribeOnce opens one pub/sub session and blocks until the connection drops
+// or ctx is cancelled. It calls onFailover for each +switch-master event received.
+func (c *sentinelClient) subscribeOnce(ctx context.Context, onFailover func()) error {
+ logger := log.FromContext(ctx).WithName("valkey-failover-watch")
+ // Connect to the first reachable sentinel for pub/sub.
+ sc := redis.NewSentinelClient(&redis.Options{
+ Addr: c.cfg.SentinelAddrs[0],
+ Password: c.cfg.AdminPassword,
+ })
+ ps := sc.Subscribe(ctx, "+switch-master")
+ defer func() { _ = sc.Close(); _ = ps.Close() }()
+ defer func() { _ = ps.Close() }()
+ logger.Info("Subscribed to Sentinel +switch-master events")
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case msg, ok := <-ps.Channel():
+ if !ok {
+ return fmt.Errorf("channel closed")
+ }
+ logger.Info("Sentinel failover detected, triggering ACL resync",
+ "event", msg.Payload)
+ onFailover()
+ }
+ }
+}
+
+// FailoverEventHook is called by WatchFailover on each detected failover event.
+// Used by main.go to increment the sentinel_failovers_total metric without
+// creating a dependency between the valkey and controller packages.
+type FailoverEventHook func()
+
+// Close closes all underlying connections.
+func (c *sentinelClient) Close() error {
+ return c.master.Close()
+}
+
+// aclSetUser issues ACL SETUSER on a single node.
+func (c *sentinelClient) aclSetUser(ctx context.Context, rdb *redis.Client, username, password string) error {
+ args := []interface{}{
+ "ACL", "SETUSER", username,
+ "on",
+ "resetpass",
+ ">" + password,
+ "resetkeys",
+ "~" + username + ":*",
+ "~lock:" + username + ":*",
+ "resetchannels",
+ "nocommands",
+ "+@read",
+ "+@write",
+ "+ping",
+ }
+ if err := rdb.Do(ctx, args...).Err(); err != nil {
+ return fmt.Errorf("ACL SETUSER %s: %w", username, err)
+ }
+ return nil
+}
+
+// rdo runs a single Redis command on the given client.
+func (c *sentinelClient) rdo(ctx context.Context, rdb *redis.Client, args ...interface{}) error {
+ return rdb.Do(ctx, args...).Err()
+}
+
+// forEachReplica discovers all current replicas via Sentinel and runs fn on each.
+// It tries each sentinel address until one responds, avoiding a single point of
+// failure. All replica errors are collected and logged; provisioning continues
+// even if individual replicas are unreachable.
+func (c *sentinelClient) forEachReplica(ctx context.Context, fn func(*redis.Client) error) error {
+ if len(c.cfg.SentinelAddrs) == 0 {
+ return nil // direct client — no replicas to discover
+ }
+ logger := log.FromContext(ctx)
+
+ replicas, err := c.sentinelReplicas(ctx)
+ if err != nil {
+ return fmt.Errorf("discover replicas: %w", err)
+ }
+
+ var errs []error
+ for _, r := range replicas {
+ addr := r["ip"] + ":" + r["port"]
+ rdb := redis.NewClient(&redis.Options{
+ Addr: addr,
+ Password: c.cfg.AdminPassword,
+ })
+ if fnErr := fn(rdb); fnErr != nil {
+ logger.Error(fnErr, "ACL operation failed on replica", "addr", addr)
+ errs = append(errs, fmt.Errorf("replica %s: %w", addr, fnErr))
+ }
+ _ = rdb.Close()
+ }
+ return errors.Join(errs...)
+}
+
+// sentinelReplicas queries each sentinel address until one returns the replica list.
+func (c *sentinelClient) sentinelReplicas(ctx context.Context) ([]map[string]string, error) {
+ var lastErr error
+ for _, addr := range c.cfg.SentinelAddrs {
+ sc := redis.NewSentinelClient(&redis.Options{
+ Addr: addr,
+ Password: c.cfg.AdminPassword,
+ })
+ replicas, err := sc.Replicas(ctx, c.cfg.MasterName).Result()
+ _ = sc.Close()
+ if err == nil {
+ return replicas, nil
+ }
+ lastErr = err
+ }
+ return nil, fmt.Errorf("all sentinels unreachable: %w", lastErr)
+}
+
+// NoopClient is a Client implementation that does nothing, used when Valkey
+// configuration is absent (e.g., local development or auth not yet enabled).
+type NoopClient struct{}
+
+func (NoopClient) UpsertUser(_ context.Context, _, _ string) error { return nil }
+func (NoopClient) DeleteUser(_ context.Context, _ string) error { return nil }
+func (NoopClient) UserExists(_ context.Context, _ string) (bool, error) { return false, nil }
+func (NoopClient) WatchFailover(_ context.Context, _ func()) error { return nil }
+func (NoopClient) Close() error { return nil }
diff --git a/internal/webhook/v1/service_webhook.go b/internal/webhook/v1/service_webhook.go
index b4d9e43..3351c0c 100644
--- a/internal/webhook/v1/service_webhook.go
+++ b/internal/webhook/v1/service_webhook.go
@@ -40,6 +40,7 @@ const (
decofileInjectAnnot = "deco.sites/decofile-inject"
decofileMountPathAnnot = "deco.sites/decofile-mount-path"
deploymentIdLabel = "app.deco/deploymentId"
+ valkeyACLSecretName = "valkey-acl"
)
// nolint:unused
@@ -290,11 +291,40 @@ func (d *ServiceCustomDefaulter) Default(ctx context.Context, obj runtime.Object
}
service.Spec.Template.Labels[deploymentIdLabel] = deploymentId
+ // Inject valkey-acl Secret as envFrom so pods receive per-tenant Valkey credentials.
+ // optional=true ensures pods start even before the Secret is provisioned by the operator,
+ // falling back to deco's FILE_SYSTEM cache in the meantime.
+ d.addOrUpdateValkeyEnvFrom(service)
+
servicelog.Info("Successfully injected Decofile into Service", "service", service.Name, "deploymentId", deploymentId, "configmap", decofile.ConfigMapName())
return nil
}
+// addOrUpdateValkeyEnvFrom injects the valkey-acl Secret as an envFrom source into the
+// target container. It is idempotent: if the reference already exists, it is left unchanged.
+func (d *ServiceCustomDefaulter) addOrUpdateValkeyEnvFrom(service *servingknativedevv1.Service) {
+ if len(service.Spec.Template.Spec.Containers) == 0 {
+ return
+ }
+ idx := d.findTargetContainer(service)
+ optional := true
+ for _, ef := range service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom {
+ if ef.SecretRef != nil && ef.SecretRef.Name == valkeyACLSecretName {
+ return // already present
+ }
+ }
+ service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom = append(
+ service.Spec.Template.Spec.PodSpec.Containers[idx].EnvFrom,
+ corev1.EnvFromSource{
+ SecretRef: &corev1.SecretEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{Name: valkeyACLSecretName},
+ Optional: &optional,
+ },
+ },
+ )
+}
+
// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
// NOTE: The 'path' attribute must follow a specific pattern and should not be modified directly here.
// Modifying the path for an invalid path can cause API server errors; failing to locate the webhook.