Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ require (
github.com/goccy/go-json v0.10.6
github.com/gojuno/minimock/v3 v3.4.7
github.com/itchyny/gojq v0.12.17
github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f
github.com/muesli/termenv v0.16.0
github.com/onsi/ginkgo/v2 v2.27.5
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
sigs.k8s.io/controller-runtime v0.20.4
)

require (
Expand Down Expand Up @@ -140,7 +142,6 @@ require (
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/controller-runtime v0.20.4 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/kustomize/api v0.19.0 // indirect
sigs.k8s.io/kustomize/kyaml v0.19.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f h1:hTyM8+nWGxBczLaa0HzjXbKJuMjdbZQa9ZB2F0wdO04=
github.com/ldmonster/kubeclient v0.0.0-20260522082709-ed73652c723f/go.mod h1:Q8GOTVz5hMCvWJjTmeLRQ79yp+AkX76yuNL/R66gybk=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down
13 changes: 7 additions & 6 deletions pkg/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,13 @@ err := so.AssembleCommonOperatorFromConfig(cfg, []string{
```

The new `AssembleCommonOperatorFromConfig(cfg, labels)` method on
`*ShellOperator` is what makes this clean — it derives both
`KubeClientConfig`s (main + object-patcher), the HTTP listen address/port,
and the metric prefix from `cfg`, so the consumer does not have to unpack
fields by hand. The older primitive-taking
`AssembleCommonOperator(listenAddress, listenPort, labels, mainKubeCfg, patcherKubeCfg)`
is still available for callers that need finer control.
`*ShellOperator` is what makes this clean — it derives the singleton
deduplicated Kubernetes client configuration, the HTTP listen address/port, and
the metric prefix from `cfg`, so the consumer does not have to unpack fields by
hand. For callers that need finer control, use
`AssembleCommonOperator(listenAddress, listenPort, labels, kubeCfg, dedupCfg)`;
it still creates exactly one singleton Kubernetes client unless a client was
already injected with `shell_operator.WithKubeClient(...)`.

If you also want env-var parsing on top of your own values, call
`ParseEnv(cfg)` between steps 1 and 2 — env values will overlay the fields you
Expand Down
28 changes: 28 additions & 0 deletions pkg/app/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ type ObjectPatcherSettings struct {
KubeClientTimeout time.Duration `env:"KUBE_CLIENT_TIMEOUT"`
}

// DedupClientSettings configures the deduplicated kubeclient cache provided by
// github.com/ldmonster/kubeclient. The cache stores a single canonical copy of
// each repeated value and subtree across watched objects, dramatically lowering
// in-memory footprint for clusters with many similar resources (e.g.
// templated Deployments). All settings here are optional; when Enabled is
// false the client is not constructed at all. List-typed env vars use a comma
// separator: GVK strings follow the form "<group>/<version>/<kind>" (the
// group may be empty for core resources, e.g. "/v1/Pod"). Enabled is
// deprecated and ignored; the singleton dedup client is always constructed.
type DedupClientSettings struct {
Enabled bool `env:"ENABLED"`
Namespaces []string `env:"NAMESPACES" envSeparator:","`
WatchGVKs []string `env:"WATCH_GVKS" envSeparator:","`
ReconstructLRUSize int `env:"RECONSTRUCT_LRU_SIZE"`
GCInterval time.Duration `env:"GC_INTERVAL"`

// SnapshotStore enables a process-wide deduplicated SnapshotStore that
// backs every kubernetes-binding monitor's per-object cache. When on,
// `*Unstructured` bodies live exactly once in memory across all
// resourceInformers (refcounted), trading a small per-snapshot-read CPU
// cost for a substantial drop in RSS for workloads with many similar
// objects. The singleton client already moves informer list/watch storage
// to the dedup path; SnapshotStore moves monitor snapshot storage to a
// shared dedup store.
SnapshotStore bool `env:"SNAPSHOT_STORE"`
}

// AdmissionSettings holds settings for the validating-webhook server.
type AdmissionSettings struct {
ConfigurationName string `env:"CONFIGURATION_NAME"`
Expand Down Expand Up @@ -83,6 +110,7 @@ type Config struct {
App AppSettings `envPrefix:"SHELL_OPERATOR_"`
Kube KubeSettings `envPrefix:"KUBE_"`
ObjectPatcher ObjectPatcherSettings `envPrefix:"OBJECT_PATCHER_"`
DedupClient DedupClientSettings `envPrefix:"DEDUP_CLIENT_"`
Admission AdmissionSettings `envPrefix:"VALIDATING_WEBHOOK_"`
Conversion ConversionSettings `envPrefix:"CONVERSION_WEBHOOK_"`
Debug DebugSettings `envPrefix:"DEBUG_"`
Expand Down
1 change: 0 additions & 1 deletion pkg/app/app_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,3 @@ func TestCLIFlagOverridesEnv(t *testing.T) {
assert.Equal(t, "1234", cfg.App.ListenPort)
assert.Equal(t, "debug", cfg.Log.Level)
}

50 changes: 50 additions & 0 deletions pkg/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func BindFlags(cfg *Config, rootCmd *cobra.Command, cmd *cobra.Command) func() {
bindLogFlags(cfg, cmd)
applyAdmission := bindAdmissionWebhookFlags(cfg, cmd)
applyConversion := bindConversionWebhookFlags(cfg, cmd)
applyDedup := bindDedupClientFlags(cfg, cmd)
bindDebugFlags(cfg, rootCmd, cmd)

return func() {
applyAdmission()
applyConversion()
applyDedup()
}
}

Expand Down Expand Up @@ -106,6 +108,54 @@ func bindConversionWebhookFlags(cfg *Config, cmd *cobra.Command) func() {
}
}

// bindDedupClientFlags registers flags for the deduplicated kubeclient cache.
// The two []string fields (Namespaces and WatchGVKs) follow the same pattern
// used by the validating-webhook ClientCA flag: any explicit CLI invocation
// fully replaces the env-var derived slice; otherwise the env value is kept.
func bindDedupClientFlags(cfg *Config, cmd *cobra.Command) func() {
f := cmd.Flags()
f.BoolVar(&cfg.DedupClient.Enabled, "dedup-client-enabled", cfg.DedupClient.Enabled,
"Deprecated no-op retained for compatibility. The singleton deduplicated kubeclient "+
"cache (github.com/ldmonster/kubeclient) is always constructed. Can be set with $DEDUP_CLIENT_ENABLED.")
f.BoolVar(&cfg.DedupClient.SnapshotStore, "dedup-client-snapshot-store", cfg.DedupClient.SnapshotStore,
"Back per-monitor object snapshots with a process-wide deduplicated store "+
"(github.com/ldmonster/kubeclient/store). Trades a small per-snapshot-read CPU "+
"cost for a substantial drop in RSS when many monitors observe similar objects. "+
"Can be set with $DEDUP_CLIENT_SNAPSHOT_STORE.")
f.IntVar(&cfg.DedupClient.ReconstructLRUSize, "dedup-client-reconstruct-lru-size",
cfg.DedupClient.ReconstructLRUSize,
"Size of the LRU that memoises reconstructed Unstructured objects in the dedup cache. "+
"Zero disables reconstruction caching. Can be set with $DEDUP_CLIENT_RECONSTRUCT_LRU_SIZE.")
f.DurationVar(&cfg.DedupClient.GCInterval, "dedup-client-gc-interval",
cfg.DedupClient.GCInterval,
"How often the deduplicated store reclaims unused interned values and subtrees. "+
"Zero leaves the kubeclient default in place. Can be set with $DEDUP_CLIENT_GC_INTERVAL.")

envNamespaces := cfg.DedupClient.Namespaces
envGVKs := cfg.DedupClient.WatchGVKs
var cliNamespaces, cliGVKs []string
f.StringArrayVar(&cliNamespaces, "dedup-client-namespace", nil,
"Namespace to restrict the dedup cache to. Repeat the flag to add more, or pass a "+
"comma-separated list via $DEDUP_CLIENT_NAMESPACES. Empty means all namespaces.")
f.StringArrayVar(&cliGVKs, "dedup-client-watch-gvk", nil,
"GroupVersionKind to pre-register with the dedup cache, formatted as "+
"\"<group>/<version>/<kind>\" (the group is empty for core resources, e.g. \"/v1/Pod\"). "+
"Repeat the flag to add more, or pass a comma-separated list via $DEDUP_CLIENT_WATCH_GVKS.")

return func() {
if len(cliNamespaces) > 0 {
cfg.DedupClient.Namespaces = cliNamespaces
} else {
cfg.DedupClient.Namespaces = envNamespaces
}
if len(cliGVKs) > 0 {
cfg.DedupClient.WatchGVKs = cliGVKs
} else {
cfg.DedupClient.WatchGVKs = envGVKs
}
}
}

func bindLogFlags(cfg *Config, cmd *cobra.Command) {
f := cmd.Flags()
f.StringVar(&cfg.Log.Level, "log-level", cfg.Log.Level, "Logging level: debug, info, error. Default is info. Can be set with $LOG_LEVEL.")
Expand Down
5 changes: 4 additions & 1 deletion pkg/hook/controller/hook_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import (
bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context"
"github.com/flant/shell-operator/pkg/hook/config"
"github.com/flant/shell-operator/pkg/hook/types"
"github.com/flant/shell-operator/pkg/kube/dedupclient"
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
"k8s.io/client-go/discovery/cached/memory"
)

// Test updating snapshots for combined contexts.
func Test_UpdateSnapshots(t *testing.T) {
g := NewWithT(t)

fc := fake.NewFakeCluster(fake.ClusterVersionV121)
mgr := kubeeventsmanager.NewKubeEventsManager(context.Background(), fc.Client, log.NewNop())
kubeClient := dedupclient.NewFromClients(fc.Client, fc.Client.Dynamic(), nil, memory.NewMemCacheClient(fc.Client.Discovery()))
mgr := kubeeventsmanager.NewKubeEventsManager(context.Background(), kubeClient, log.NewNop())

testHookConfig := `
configVersion: v1
Expand Down
126 changes: 126 additions & 0 deletions pkg/kube/dedupclient/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# pkg/kube/dedupclient

Two-part integration of
[`github.com/ldmonster/kubeclient`](https://github.com/ldmonster/kubeclient)
into shell-operator. Both parts can be enabled independently and target
different memory holders in the Kubernetes binding path.

| Component | Type | Purpose | Flag |
| ---------------- | -------------------------- | -------------------------------------------------------------------------------------------------------- | ----------------------------------- |
| `Client` | `*kubeclient.DedupClient` singleton facade | The only Kubernetes client shell-operator constructs. It exposes controller-runtime, dynamic, typed, discovery, and RESTMapper APIs, and backs kube-events-manager resource informers with dedup stores. | always on |
| `SnapshotStore` | `*store.DedupStore` wrapper | Process-wide, reference-counted cache that backs every kube-events-manager monitor's per-object snapshot. | `--dedup-client-snapshot-store` |

For clusters with thousands of similar resources (e.g. templated
`Deployment`s) the upstream store reports **60–90 %** lower cache memory
usage thanks to value interning and subtree deduplication.

## Quick start

```go
client, err := dedupclient.New(dedupclient.Config{
Context: "kind-dev", // optional kubeconfig context
Config: "/path/to/kubeconfig", // optional kubeconfig path
QPS: 20,
Burst: 40,
Namespaces: []string{"kube-system", "default"}, // empty = all
WatchGVKs: []schema.GroupVersionKind{
{Group: "", Version: "v1", Kind: "Pod"},
{Group: "apps", Version: "v1", Kind: "Deployment"},
},
ReconstructLRUSize: 4096, // 0 disables reconstruction caching
}, logger)
```

`Start(ctx)` spins up the cache run loop in a single dedicated goroutine and
returns immediately. `Shutdown(ctx)` cancels the loop and waits for it to
exit (or for `ctx` to expire).

## How shell-operator wires it up

`AssembleCommonOperatorFromConfig` always constructs one `*dedupclient.Client`
unless a library caller has already injected one with
`shell_operator.WithKubeClient(...)`. The resulting singleton is stored on
`shell_operator.ShellOperator.KubeClient`, started during `op.Start()`, and
stopped from `op.Shutdown()`. Kubernetes binding resource informers use the
singleton's dedup-backed informer factory; namespace label-selector informers
still use the singleton's typed client-go namespace interface.

Configuration knobs (env vars / CLI flags):

| Env var | Flag | Meaning |
| ------------------------------------ | ------------------------------------ | ------------------------------------------------------------ |
| `DEDUP_CLIENT_ENABLED` | `--dedup-client-enabled` | Deprecated no-op; the singleton dedup client is always constructed. |
| `DEDUP_CLIENT_SNAPSHOT_STORE` | `--dedup-client-snapshot-store` | Back per-monitor snapshots with the shared dedup store. |
| `DEDUP_CLIENT_NAMESPACES` | `--dedup-client-namespace` | Comma-separated (env) or repeated (flag) namespace allow-list. Empty = all. |
| `DEDUP_CLIENT_WATCH_GVKS` | `--dedup-client-watch-gvk` | GVKs to pre-register, formatted as `<group>/<version>/<kind>` (group empty for core). |
| `DEDUP_CLIENT_RECONSTRUCT_LRU_SIZE` | `--dedup-client-reconstruct-lru-size`| LRU size for reconstructed Unstructured objects. 0 disables. |
| `DEDUP_CLIENT_GC_INTERVAL` | `--dedup-client-gc-interval` | GC interval for unused interned values/subtrees. 0 = upstream default. |

The singleton client is always constructed. Optional knobs only tune cache
scope, reconstruction, GC, and monitor snapshot storage.

## SnapshotStore — monitor snapshot memory

`SnapshotStore` plugs into shell-operator's `pkg/kube_events_manager` so that
every monitor's `cachedObjects` map stops holding `*Unstructured` pointers
and instead stores `(resourceId → store.ObjectKey)` references into a
process-wide, reference-counted dedup store.

What changes when the flag is on (`MonitorConfig.KeepFullObjectsInMemory == true`):

- Each `resourceInformer` calls `SnapshotStore.Acquire(ownerID, key, obj)` on
initial-list and on Add/Modified events. The store de-duplicates field
values and subtrees across every object it holds.
- The per-monitor `*ObjectAndFilterResult` keeps `Object == nil`; the
authoritative body lives once in the store.
- `monitor.Snapshot()` reconstitutes `Object` lazily by calling
`SnapshotStore.Get(key)`. Reconstitution is a fresh allocation per call,
which trades a small CPU cost for the memory drop.
- On informer shutdown, all keys held by that informer are released. The
underlying object is removed from the store only when the last owner
releases it, so overlapping watches are correctly handled.

When `KeepFullObjectsInMemory == false`, the existing "no full body kept"
path takes precedence and the store is bypassed for that monitor — there is
no benefit to deduplicating bodies you've already chosen to discard.

### When does it actually save memory?

The win scales with two factors:

1. **Cross-factory duplication.** Each unique
`(GVR, namespace, fieldSelector, labelSelector)` gets its own client-go
informer cache today. When several monitors observe overlapping object
sets through *different* selectors, every cache holds its own copy. Once
`SnapshotStore` is on, the bodies converge to a single deduplicated copy
regardless of how many monitors observe them.
2. **Intra-object subtree duplication.** Even within a single GVR, similar
objects share substantial structure — e.g. a thousand Pods generated
from one template share `securityContext`, `tolerations`, `resources`,
and most label/annotation keys. Value interning + subtree dedup encode
those shared parts once.

If your hooks rarely call `Snapshot()` on each event the CPU cost of
reconstruction is negligible; if they do (and operate on huge snapshots),
benchmark before turning it on.

## Debug endpoint

Once registered (automatically in `bootstrap.go`), the debug server exposes:

```
GET /dedup-client/status.{json|yaml|text}
```

The response carries the status of both components:

```json
{
"client": { "enabled": true, "cacheSyncedHint": false },
"snapshotStore": { "enabled": true, "liveObjects": 1284, "totalAcquires": 5012, "totalReleases": 3728, "totalDeletes": 211 }
}
```

Each component reports `enabled: false` with a clear `reason` when its flag
is not set, so liveness probes can distinguish "not configured" from
"errored".
Loading