diff --git a/cmd/shell-operator/main.go b/cmd/shell-operator/main.go index 015e469d..92573603 100644 --- a/cmd/shell-operator/main.go +++ b/cmd/shell-operator/main.go @@ -57,6 +57,11 @@ func main() { applySliceFlags := app.BindFlags(cfg, rootCmd, startCmd) startCmd.PreRunE = func(_ *cobra.Command, _ []string) error { applySliceFlags() + // Sync the resolved debug socket path into the package-level CLI + // default so any debug-* sub-command launched in the same process + // (e.g. embedded test harnesses) sees the same value the operator + // will bind to. + debug.DefaultSocketPath = cfg.Debug.UnixSocket return nil } rootCmd.AddCommand(startCmd) @@ -68,6 +73,7 @@ func main() { rootCmd.RunE = start(logger, cfg) rootCmd.PreRunE = func(_ *cobra.Command, _ []string) error { applySliceFlags() + debug.DefaultSocketPath = cfg.Debug.UnixSocket return nil } diff --git a/cmd/shell-operator/start.go b/cmd/shell-operator/start.go index bd168ede..40f62689 100644 --- a/cmd/shell-operator/start.go +++ b/cmd/shell-operator/start.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "os" + "os/signal" "strings" + "syscall" "github.com/deckhouse/deckhouse/pkg/log" "github.com/spf13/cobra" @@ -15,9 +17,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.30.0" "github.com/flant/shell-operator/pkg/app" - "github.com/flant/shell-operator/pkg/metrics" shell_operator "github.com/flant/shell-operator/pkg/shell-operator" - utils_signal "github.com/flant/shell-operator/pkg/utils/signal" ) const ( @@ -25,31 +25,29 @@ const ( AppDescription = "Shell-operator is a tool for running event-driven scripts in a Kubernetes cluster" ) +// start returns the cobra RunE used by the "start" sub-command. It owns the +// process-wide concerns (signal handling, OpenTelemetry, log defaults) and +// delegates the actual lifecycle to shell-operator's Run method, which Starts, +// blocks until ctx is done, and synchronously Shuts down. func start(logger *log.Logger, cfg *app.Config) func(cmd *cobra.Command, args []string) error { return func(_ *cobra.Command, _ []string) error { - app.AppStartMessage = fmt.Sprintf("%s %s", app.AppName, app.Version) - ctx := context.Background() + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + telemetryShutdown := registerTelemetry(ctx) + defer func() { _ = telemetryShutdown(context.Background()) }() - // Initialize metric names with the configured prefix. - metrics.InitMetrics(cfg.App.PrometheusMetricsPrefix) + fmt.Println(app.AppName, app.Version) - // Init logging and initialize a ShellOperator instance. - operator, err := shell_operator.Init(ctx, cfg, logger.Named("shell-operator")) + op, err := shell_operator.NewShellOperator(ctx, cfg, shell_operator.WithLogger(logger.Named("shell-operator"))) if err != nil { - return fmt.Errorf("init failed: %w", err) + return fmt.Errorf("build shell-operator: %w", err) } - operator.Start() - - // Block until OS signal. - utils_signal.WaitForProcessInterruption(func() { - operator.Shutdown() - _ = telemetryShutdown(ctx) - os.Exit(1) - }) - - return nil + // Run blocks until ctx is canceled (typically by SIGINT/SIGTERM) and + // performs a synchronous Shutdown before returning. No os.Exit on the + // graceful path; the caller (cobra) maps a nil return to exit 0. + return op.Run(ctx) } } diff --git a/go.mod b/go.mod index 2f3e8d93..73c3899a 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/onsi/ginkgo/v2 v2.27.5 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 + go.uber.org/goleak v1.3.0 ) require ( diff --git a/pkg/app/app.go b/pkg/app/app.go index e032f0e2..3e94cce2 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -1,9 +1,11 @@ package app +// AppName and AppDescription are read-only product identifiers, safe to use +// from anywhere. AppStartMessage was previously a mutable global rewritten at +// startup; library consumers now build their own banner from AppName/Version. var ( - AppName = "shell-operator" - AppDescription = "Run your custom cluster-wide scripts in reaction to Kubernetes events or on schedule." - AppStartMessage = "shell-operator" + AppName = "shell-operator" + AppDescription = "Run your custom cluster-wide scripts in reaction to Kubernetes events or on schedule." ) var Version = "v1.2.0-dev" diff --git a/pkg/app/debug.go b/pkg/app/debug.go index 86856225..84c22c8e 100644 --- a/pkg/app/debug.go +++ b/pkg/app/debug.go @@ -1,39 +1,9 @@ -package app - -import ( - "github.com/spf13/cobra" -) - -// DebugUnixSocket is the default path for the debug unix socket. -// It is used as the binding target for the --debug-unix-socket flag on debug -// sub-commands (queue, hook, etc.) that connect to a running operator. -// For the start command, cfg.Debug.UnixSocket is preferred; see flags.go. -var DebugUnixSocket = "/var/run/shell-operator/debug.socket" - -// ApplyConfig copies values from cfg into package-level globals that pre-date -// the Config struct and therefore cannot be populated by caarlos0/env (today -// just DebugUnixSocket). -// -// Call it whenever cfg may have been built outside the CLI flow — most -// importantly when an outer program (e.g. addon-operator) assembles its own -// *Config and hands it to shell-operator without going through BindFlags. -// After this call, debug sub-commands that bind --debug-unix-socket against -// the global will see cfg.Debug.UnixSocket as their default. +// Package app intentionally no longer exposes the DebugUnixSocket global or +// ApplyConfig helper that used to live here. The source of truth for the +// debug socket path is now cfg.Debug.UnixSocket (see app_config.go); CLI +// commands that need to connect to a running operator bind their +// --debug-unix-socket flag against the helper in package debug instead. // -// A nil cfg is a no-op so callers don't need to guard. ApplyConfig is also -// invoked from BindFlags and shell_operator.Init, so most users get the -// override for free; calling it again is safe and idempotent. -func ApplyConfig(cfg *Config) { - if cfg == nil { - return - } - DebugUnixSocket = cfg.Debug.UnixSocket -} - -// DefineDebugUnixSocketFlag registers the --debug-unix-socket flag on cmd, -// binding it to the DebugUnixSocket global. Called by debug sub-commands that -// need to locate the operator's debug socket. -func DefineDebugUnixSocketFlag(cmd *cobra.Command) { - cmd.Flags().StringVar(&DebugUnixSocket, "debug-unix-socket", DebugUnixSocket, "A path to a unix socket for a debug endpoint.") - _ = cmd.Flags().MarkHidden("debug-unix-socket") -} +// This file is kept (empty) so external code that imports the package keeps +// compiling; new code should not add package-level mutable state here. +package app diff --git a/pkg/app/debug_test.go b/pkg/app/debug_test.go index 03d6c5df..582bdc90 100644 --- a/pkg/app/debug_test.go +++ b/pkg/app/debug_test.go @@ -21,30 +21,3 @@ func TestDebugKeepTempFilesIsBool(t *testing.T) { cfg.Debug.KeepTempFiles = false assert.False(t, cfg.Debug.KeepTempFiles) } - -// TestApplyConfig_OverridesDebugUnixSocket guarantees that handing a *Config -// built by an outer program to ApplyConfig replaces the package-level -// DebugUnixSocket global, even when BindFlags is never called. -func TestApplyConfig_OverridesDebugUnixSocket(t *testing.T) { - prev := DebugUnixSocket - t.Cleanup(func() { DebugUnixSocket = prev }) - - cfg := NewConfig() - cfg.Debug.UnixSocket = "/run/outer-program/debug.socket" - - ApplyConfig(cfg) - - assert.Equal(t, "/run/outer-program/debug.socket", DebugUnixSocket) -} - -// TestApplyConfig_NilIsNoop documents that ApplyConfig tolerates a nil cfg -// so callers don't need to guard at every call site. -func TestApplyConfig_NilIsNoop(t *testing.T) { - prev := DebugUnixSocket - t.Cleanup(func() { DebugUnixSocket = prev }) - - DebugUnixSocket = "/run/sentinel.socket" - ApplyConfig(nil) - - assert.Equal(t, "/run/sentinel.socket", DebugUnixSocket) -} diff --git a/pkg/app/flags.go b/pkg/app/flags.go index 810dfcfe..f5192264 100644 --- a/pkg/app/flags.go +++ b/pkg/app/flags.go @@ -115,10 +115,9 @@ func bindLogFlags(cfg *Config, cmd *cobra.Command) { } func bindDebugFlags(cfg *Config, rootCmd *cobra.Command, cmd *cobra.Command) { - // Sync the package-level global so debug sub-commands (queue, hook, etc.) - // that bind to DebugUnixSocket get the env/default value. - ApplyConfig(cfg) - + // Note: cfg.Debug.UnixSocket is the source of truth. The CLI (main.go) + // is responsible for syncing it into debug.DefaultSocketPath after flag + // parsing so debug sub-commands can connect to the same socket. f := cmd.Flags() f.StringVar(&cfg.Debug.UnixSocket, "debug-unix-socket", cfg.Debug.UnixSocket, "A path to a unix socket for a debug endpoint. Can be set with $DEBUG_UNIX_SOCKET.") _ = f.MarkHidden("debug-unix-socket") diff --git a/pkg/debug/cli_socket.go b/pkg/debug/cli_socket.go new file mode 100644 index 00000000..02beab74 --- /dev/null +++ b/pkg/debug/cli_socket.go @@ -0,0 +1,32 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package debug + +import ( + "github.com/spf13/cobra" +) + +// DefaultSocketPath is the path the debug CLI client (DefaultClient) connects +// to when no explicit socket has been configured. It is a CLI concern only — +// library consumers should build a debug.Client with NewClient(path) directly. +var DefaultSocketPath = "/var/run/shell-operator/debug.socket" + +// DefineSocketFlag binds the --debug-unix-socket flag on cmd to the +// process-level DefaultSocketPath used by DefaultClient(). Called by debug +// sub-commands (queue, hook, config, raw). +func DefineSocketFlag(cmd *cobra.Command) { + cmd.Flags().StringVar(&DefaultSocketPath, "debug-unix-socket", DefaultSocketPath, "A path to a unix socket for a debug endpoint.") + _ = cmd.Flags().MarkHidden("debug-unix-socket") +} diff --git a/pkg/debug/client.go b/pkg/debug/client.go index ddbf7f50..1a0f4dc8 100644 --- a/pkg/debug/client.go +++ b/pkg/debug/client.go @@ -9,7 +9,6 @@ import ( "net/http" "time" - "github.com/flant/shell-operator/pkg/app" utils "github.com/flant/shell-operator/pkg/utils/file" ) @@ -53,8 +52,11 @@ func (c *Client) Close() { } } +// DefaultClient connects to the unix socket pointed at by DefaultSocketPath. +// CLI commands bind --debug-unix-socket against DefaultSocketPath via +// DefineSocketFlag so that this function picks up any flag-supplied override. func DefaultClient() (*Client, error) { - return NewClient(app.DebugUnixSocket) + return NewClient(DefaultSocketPath) } func (c *Client) Get(url string) ([]byte, error) { diff --git a/pkg/debug/debug-cmd.go b/pkg/debug/debug-cmd.go index a36640b3..c7bc97e7 100644 --- a/pkg/debug/debug-cmd.go +++ b/pkg/debug/debug-cmd.go @@ -8,8 +8,6 @@ import ( "github.com/muesli/termenv" "github.com/spf13/cobra" - - "github.com/flant/shell-operator/pkg/app" ) var ( @@ -86,7 +84,7 @@ func DefineDebugCommands(rootCmd *cobra.Command) { queueListCmd.Flags().BoolVarP(&watch, "watch", "w", false, "Keep watching.") queueListCmd.Flags().StringVarP(&watchInterval, "watchInterval", "t", watchInterval, "Watch refresh interval.") AddOutputJsonYamlTextFlag(queueListCmd) - app.DefineDebugUnixSocketFlag(queueListCmd) + DefineSocketFlag(queueListCmd) queueCmd.AddCommand(queueListCmd) queueMainCmd := &cobra.Command{ @@ -106,7 +104,7 @@ func DefineDebugCommands(rootCmd *cobra.Command) { }, } AddOutputJsonYamlTextFlag(queueMainCmd) - app.DefineDebugUnixSocketFlag(queueMainCmd) + DefineSocketFlag(queueMainCmd) queueCmd.AddCommand(queueMainCmd) // Runtime config command. @@ -130,7 +128,7 @@ func DefineDebugCommands(rootCmd *cobra.Command) { }, } AddOutputJsonYamlTextFlag(configListCmd) - app.DefineDebugUnixSocketFlag(configListCmd) + DefineSocketFlag(configListCmd) configCmd.AddCommand(configListCmd) var paramName string @@ -162,7 +160,7 @@ func DefineDebugCommands(rootCmd *cobra.Command) { return nil }, } - app.DefineDebugUnixSocketFlag(configSetCmd) + DefineSocketFlag(configSetCmd) configCmd.AddCommand(configSetCmd) // Raw request command @@ -188,7 +186,7 @@ func DefineDebugCommands(rootCmd *cobra.Command) { return nil }, } - app.DefineDebugUnixSocketFlag(rawCommand) + DefineSocketFlag(rawCommand) rootCmd.AddCommand(rawCommand) } @@ -213,7 +211,7 @@ func DefineDebugCommandsSelf(rootCmd *cobra.Command) { }, } AddOutputJsonYamlTextFlag(hookListCmd) - app.DefineDebugUnixSocketFlag(hookListCmd) + DefineSocketFlag(hookListCmd) hookCmd.AddCommand(hookListCmd) hookSnapshotCmd := &cobra.Command{ @@ -234,7 +232,7 @@ func DefineDebugCommandsSelf(rootCmd *cobra.Command) { }, } AddOutputJsonYamlTextFlag(hookSnapshotCmd) - app.DefineDebugUnixSocketFlag(hookSnapshotCmd) + DefineSocketFlag(hookSnapshotCmd) hookCmd.AddCommand(hookSnapshotCmd) } diff --git a/pkg/debug/server.go b/pkg/debug/server.go index 1b85cb36..d2cefe7f 100644 --- a/pkg/debug/server.go +++ b/pkg/debug/server.go @@ -1,6 +1,8 @@ package debug import ( + "context" + "errors" "fmt" "io" "log/slog" @@ -10,6 +12,7 @@ import ( "path" "path/filepath" "strings" + "sync" "github.com/deckhouse/deckhouse/pkg/log" "github.com/go-chi/chi/v5" @@ -29,6 +32,13 @@ type Server struct { Router chi.Router + mu sync.Mutex + unixSrv *http.Server + unixDone chan struct{} + httpSrv *http.Server + httpDone chan struct{} + socketPath string // captured at Init time so Shutdown can remove it + logger *log.Logger } @@ -65,7 +75,6 @@ func (s *Server) Init() error { } } - // Check if socket is available listener, err := net.Listen("unix", address) if err != nil { return fmt.Errorf("Debug HTTP server fail to listen on '%s': %w", address, err) @@ -73,18 +82,38 @@ func (s *Server) Init() error { s.logger.Info("Debug endpoint listen on address", slog.String(pkg.LogKeyAddress, address)) + unixSrv := &http.Server{Handler: s.Router} + unixDone := make(chan struct{}) + + s.mu.Lock() + s.unixSrv = unixSrv + s.unixDone = unixDone + s.socketPath = address + s.mu.Unlock() + go func() { - if err := http.Serve(listener, s.Router); err != nil { - s.logger.Error("Error starting Debug socket server", log.Err(err)) - os.Exit(1) + defer close(unixDone) + if err := unixSrv.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.logger.Error("Debug unix socket server stopped with error", log.Err(err)) } }() if s.HttpAddr != "" { + httpSrv := &http.Server{ + Addr: s.HttpAddr, + Handler: s.Router, + } + httpDone := make(chan struct{}) + + s.mu.Lock() + s.httpSrv = httpSrv + s.httpDone = httpDone + s.mu.Unlock() + go func() { - if err := http.ListenAndServe(s.HttpAddr, s.Router); err != nil { - s.logger.Error("Error starting Debug HTTP server", log.Err(err)) - os.Exit(1) + defer close(httpDone) + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.logger.Error("Debug HTTP server stopped with error", log.Err(err)) } }() } @@ -92,6 +121,57 @@ func (s *Server) Init() error { return nil } +// Shutdown gracefully stops both the unix-socket server and the optional HTTP +// server. The unix socket file is removed on the way out. Safe to call when +// Init was never run or after a previous Shutdown. +func (s *Server) Shutdown(ctx context.Context) error { + s.mu.Lock() + unixSrv := s.unixSrv + unixDone := s.unixDone + httpSrv := s.httpSrv + httpDone := s.httpDone + socketPath := s.socketPath + s.unixSrv = nil + s.httpSrv = nil + s.mu.Unlock() + + var errs []error + + if unixSrv != nil { + if err := unixSrv.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("debug unix server shutdown: %w", err)) + } + if unixDone != nil { + select { + case <-unixDone: + case <-ctx.Done(): + errs = append(errs, ctx.Err()) + } + } + } + + if httpSrv != nil { + if err := httpSrv.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("debug http server shutdown: %w", err)) + } + if httpDone != nil { + select { + case <-httpDone: + case <-ctx.Done(): + errs = append(errs, ctx.Err()) + } + } + } + + if socketPath != "" { + if err := os.Remove(socketPath); err != nil && !errors.Is(err, os.ErrNotExist) { + errs = append(errs, fmt.Errorf("remove debug socket %q: %w", socketPath, err)) + } + } + + return errors.Join(errs...) +} + // RegisterHandler registers http handler for unix/http debug server func (s *Server) RegisterHandler(method, pattern string, handler func(request *http.Request) (interface{}, error)) { if method == "" { diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 6e723066..e192d0d2 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "log/slog" @@ -24,12 +25,20 @@ const ( serviceName = "executor" ) +// Run starts the command, waits for it to complete, and returns the error. +// The child PID is registered in the global process registry while the process +// is running so that a PID-1 zombie reaper does not steal it. func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) - return cmd.Run() + if err := startAndRegister(cmd); err != nil { + return err + } + defer unregisterPID(cmd.Process.Pid) + + return cmd.Wait() } // StderrError is returned by RunAndLogLines when a command fails and produces @@ -113,7 +122,34 @@ func (e *Executor) Output() ([]byte, error) { e.logger.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), slog.String(pkg.LogKeyDir, e.cmd.Dir)) - return e.cmd.Output() + + // Reproduce cmd.Output() but interleave PID registration so that the + // PID-1 zombie reaper skips this process. + if e.cmd.Stdout != nil { + return nil, errors.New("exec: Stdout already set") + } + var stdout bytes.Buffer + e.cmd.Stdout = &stdout + + captureErr := e.cmd.Stderr == nil + var stderrBuf bytes.Buffer + if captureErr { + e.cmd.Stderr = &stderrBuf + } + + if err := startAndRegister(e.cmd); err != nil { + return nil, err + } + defer unregisterPID(e.cmd.Process.Pid) + + err := e.cmd.Wait() + if err != nil && captureErr { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdout.Bytes(), err } type CmdUsage struct { @@ -154,7 +190,12 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri e.cmd.Stdout = plo e.cmd.Stderr = io.MultiWriter(ple, stdErr) - err := e.cmd.Run() + if err := startAndRegister(e.cmd); err != nil { + return nil, fmt.Errorf("cmd start: %w", err) + } + defer unregisterPID(e.cmd.Process.Pid) + + err := e.cmd.Wait() if err != nil { if len(stdErr.Bytes()) > 0 { return nil, &StderrError{Message: stdErr.String()} diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index ccfc52fb..9b663080 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -3,11 +3,11 @@ package executor import ( "bytes" "context" - json "github.com/flant/shell-operator/pkg/utils/json" "fmt" "io" "math/rand/v2" "os" + "os/exec" "regexp" "strings" "testing" @@ -16,6 +16,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + json "github.com/flant/shell-operator/pkg/utils/json" ) func TestRunAndLogLines(t *testing.T) { @@ -250,3 +252,221 @@ func randStringRunes(n int) string { } return string(b) } + +// newTestRegistry creates a fresh processRegistry for tests, swaps the +// global singleton, and restores the original with t.Cleanup. It returns +// the fresh test registry. +func newTestRegistry(t *testing.T) *processRegistry { + t.Helper() + + r := &processRegistry{activePIDs: make(map[int]struct{})} + orig := registry + registry = r + t.Cleanup(func() { registry = orig }) + + return r +} + +func TestProcessRegistry_Basic(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int]struct{})} + + // Initially empty + assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID") + assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID") + + // Register and check + r.register(42) + assert.True(t, r.IsActive(42), "IsActive should return true for registered PID") + assert.False(t, r.IsActive(43), "IsActive should return false for different PID") + + // Unregister and check + r.unregister(42) + assert.False(t, r.IsActive(42), "IsActive should return false after unregister") +} + +func TestProcessRegistry_DoubleUnregister(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int]struct{})} + + r.register(100) + r.unregister(100) + r.unregister(100) // should not panic + + assert.False(t, r.IsActive(100)) +} + +func TestProcessRegistry_Concurrent(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int]struct{})} + const goroutines = 100 + const pidsPerGoroutine = 100 + + done := make(chan struct{}) + + // Concurrently register PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.register(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be registered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.True(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } + + // Concurrently unregister PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.unregister(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be unregistered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.False(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } +} + +func TestTracker_IsActive(t *testing.T) { + newTestRegistry(t) + tracker := Tracker() + + // PID not registered + assert.False(t, tracker.IsActive(42)) + + // Register via internal helper (same path as executor methods) + registerPID(42) + assert.True(t, tracker.IsActive(42)) + + unregisterPID(42) + assert.False(t, tracker.IsActive(42)) +} + +func TestStartAndRegister_AtomicWithReaper(t *testing.T) { + r := newTestRegistry(t) + + // StartAndRegister must hold the write-lock across both cmd.Start() and + // PID registration, so there is no window where a zombie reaper could + // observe IsActive(pid) == false for a child that cmd.Wait will later reap. + cmd := exec.Command("sleep", "2") + require.NoError(t, startAndRegister(cmd)) + defer cmd.Process.Kill() + + pid := cmd.Process.Pid + + // The PID must already be visible in the registry — no race window. + assert.True(t, r.IsActive(pid), "PID should be registered immediately after StartAndRegister returns") + + // Simulate what the reaper does: check via the ProcessTracker interface. + tracker := Tracker() + assert.True(t, tracker.IsActive(pid), "ProcessTracker must see the PID as active") + + // Clean up: wait for the process to finish after killing it. + _ = cmd.Process.Kill() + _ = cmd.Wait() + + unregisterPID(pid) + assert.False(t, r.IsActive(pid), "PID should be gone after unregister") +} + +func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { + r := newTestRegistry(t) + + ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo hello"}, []string{}) + + outputCh := make(chan []byte, 1) + errCh := make(chan error, 1) + go func() { + output, err := ex.Output() + outputCh <- output + errCh <- err + }() + + assert.Eventually(t, func() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.activePIDs) > 0 + }, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while Output is running") + + output := <-outputCh + err := <-errCh + assert.NoError(t, err) + assert.Contains(t, string(output), "hello") + + r.mu.RLock() + count := len(r.activePIDs) + r.mu.RUnlock() + assert.Empty(t, count, "expected registry to be empty after Output returns") +} + +func TestGlobalRegistry_Output_FailedStart(t *testing.T) { + newTestRegistry(t) + + // Command that doesn't exist — Start() should fail. + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}) + _, err := ex.Output() + assert.Error(t, err) +} + +func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { + r := newTestRegistry(t) + + logger := log.NewLogger() + logger.SetLevel(log.LevelInfo) + + ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo test-output"}, []string{}). + WithLogger(logger) + + usageCh := make(chan *CmdUsage, 1) + errCh := make(chan error, 1) + go func() { + usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + usageCh <- usage + errCh <- err + }() + + assert.Eventually(t, func() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.activePIDs) > 0 + }, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while RunAndLogLines is running") + + usage := <-usageCh + err := <-errCh + assert.NoError(t, err) + assert.NotNil(t, usage) + + r.mu.RLock() + count := len(r.activePIDs) + r.mu.RUnlock() + assert.Empty(t, count, "expected registry to be empty after RunAndLogLines returns") +} + +func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { + newTestRegistry(t) + + logger := log.NewLogger() + + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}). + WithLogger(logger) + + _, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.Error(t, err) +} diff --git a/pkg/executor/registry.go b/pkg/executor/registry.go new file mode 100644 index 00000000..9f5097ca --- /dev/null +++ b/pkg/executor/registry.go @@ -0,0 +1,100 @@ +package executor + +import ( + "os/exec" + "sync" +) + +// ProcessTracker is a read-only view into the process registry. +// It is intended for consumers (such as a PID-1 zombie reaper) that need +// to check whether a PID is managed by the executor but must not modify +// the registry. +type ProcessTracker interface { + // IsActive reports whether pid is currently tracked as a running process. + IsActive(pid int) bool +} + +// processRegistry tracks PIDs of processes started by the executor so that +// a PID-1 zombie reaper can skip them (their parent already calls Wait). +// This prevents the reaper from stealing a child that cmd.Wait expects to reap. +// +// The struct is intentionally unexported — all external access goes through +// the ProcessTracker interface (read-only) or the package-level helpers +// registerPID / unregisterPID (write, executor-internal). +type processRegistry struct { + mu sync.RWMutex + activePIDs map[int]struct{} +} + +// register adds pid to the set of active PIDs. +func (r *processRegistry) register(pid int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.activePIDs[pid] = struct{}{} +} + +// unregister removes pid from the set of active PIDs. +func (r *processRegistry) unregister(pid int) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.activePIDs, pid) +} + +// startAndRegister calls cmd.Start() and, on success, registers the child +// PID under the same write-lock. This eliminates the window between process +// creation and registration during which a PID-1 zombie reaper could +// prematurely reap a fast-exiting child. +func (r *processRegistry) startAndRegister(cmd *exec.Cmd) error { + r.mu.Lock() + defer r.mu.Unlock() + + if err := cmd.Start(); err != nil { + return err + } + + r.activePIDs[cmd.Process.Pid] = struct{}{} + + return nil +} + +// IsActive reports whether pid is currently tracked as an active process. +func (r *processRegistry) IsActive(pid int) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + _, ok := r.activePIDs[pid] + + return ok +} + +// registry is the singleton process registry. +// It is not exported — external packages obtain a ProcessTracker via Tracker(). +var registry = &processRegistry{ + activePIDs: make(map[int]struct{}), +} + +// Tracker returns a read-only view of the global process registry. +// The zombie reaper should call this once and use the returned ProcessTracker +// to check whether a PID is managed by the executor. +func Tracker() ProcessTracker { + return registry +} + +// registerPID and unregisterPID are package-internal helpers used by Run, +// Output, and RunAndLogLines to track child PIDs. +func registerPID(pid int) { + registry.register(pid) +} + +func unregisterPID(pid int) { + registry.unregister(pid) +} + +// startAndRegister calls cmd.Start() and registers the resulting PID +// atomically under the registry's write-lock. Callers must still call +// unregisterPID(cmd.Process.Pid) (typically via defer) when cmd.Wait returns. +func startAndRegister(cmd *exec.Cmd) error { + return registry.startAndRegister(cmd) +} diff --git a/pkg/hook/config/config_v1.go b/pkg/hook/config/config_v1.go index c23f90ea..80d57de7 100644 --- a/pkg/hook/config/config_v1.go +++ b/pkg/hook/config/config_v1.go @@ -443,7 +443,10 @@ func convertValidating(cfgV1 KubernetesAdmissionConfigV1) htypes.ValidatingConfi if cfgV1.FailurePolicy != nil { webhook.FailurePolicy = cfgV1.FailurePolicy } else { - defaultFailurePolicy := v1.FailurePolicyType(admission.DefaultSettings.DefaultFailurePolicy) + // "Fail" matches Kubernetes' own default and what the operator used + // to embed as admission.DefaultSettings.DefaultFailurePolicy. Hooks + // that need a different default should set it explicitly. + defaultFailurePolicy := v1.FailurePolicyType("Fail") webhook.FailurePolicy = &defaultFailurePolicy } diff --git a/pkg/hook/hook_manager_discrete_test.go b/pkg/hook/hook_manager_discrete_test.go index ccdb9336..c61a3946 100644 --- a/pkg/hook/hook_manager_discrete_test.go +++ b/pkg/hook/hook_manager_discrete_test.go @@ -74,10 +74,10 @@ func TestLoadHook_allHooksHaveNonNilConfig(t *testing.T) { // returns an error when the hook binary doesn't exist or fails. func TestFetchHookConfig_returnsErrorForNonExecutable(t *testing.T) { conversionManager := conversion.NewWebhookManager() - conversionManager.Settings = conversion.DefaultSettings + conversionManager.Settings = testConversionSettings() admissionManager := admission.NewWebhookManager(nil) - admissionManager.Settings = admission.DefaultSettings + admissionManager.Settings = testAdmissionSettings() cfg := &ManagerConfig{ WorkingDir: t.TempDir(), diff --git a/pkg/hook/hook_manager_test.go b/pkg/hook/hook_manager_test.go index ef679ff4..1f219be2 100644 --- a/pkg/hook/hook_manager_test.go +++ b/pkg/hook/hook_manager_test.go @@ -19,10 +19,10 @@ func newHookManager(t *testing.T, testdataDir string) *Manager { hooksDir, _ := filepath.Abs(testdataDir) conversionManager := conversion.NewWebhookManager() - conversionManager.Settings = conversion.DefaultSettings + conversionManager.Settings = testConversionSettings() admissionManager := admission.NewWebhookManager(nil) - admissionManager.Settings = admission.DefaultSettings + admissionManager.Settings = testAdmissionSettings() cfg := &ManagerConfig{ WorkingDir: hooksDir, @@ -388,10 +388,10 @@ func Test_HookManager_onstartup_order(t *testing.T) { func Test_ManagerConfig_HookOptions_PropagateToManager(t *testing.T) { conversionManager := conversion.NewWebhookManager() - conversionManager.Settings = conversion.DefaultSettings + conversionManager.Settings = testConversionSettings() admissionManager := admission.NewWebhookManager(nil) - admissionManager.Settings = admission.DefaultSettings + admissionManager.Settings = testAdmissionSettings() cfg := &ManagerConfig{ WorkingDir: t.TempDir(), diff --git a/pkg/hook/testing_helpers_test.go b/pkg/hook/testing_helpers_test.go new file mode 100644 index 00000000..0d5c7f85 --- /dev/null +++ b/pkg/hook/testing_helpers_test.go @@ -0,0 +1,54 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hook + +import ( + "github.com/flant/shell-operator/pkg/webhook/admission" + "github.com/flant/shell-operator/pkg/webhook/conversion" + "github.com/flant/shell-operator/pkg/webhook/server" +) + +// testAdmissionSettings replaces what used to be admission.DefaultSettings. +// Tests construct their own *admission.WebhookSettings instead of relying on +// a package-level singleton; this keeps every test self-contained and lets the +// production code drop the global entirely. +func testAdmissionSettings() *admission.WebhookSettings { + return &admission.WebhookSettings{ + Settings: server.Settings{ + ServerCertPath: "/validating-certs/tls.crt", + ServerKeyPath: "/validating-certs/tls.key", + ServiceName: "shell-operator-validating-svc", + ListenAddr: "0.0.0.0", + ListenPort: "9680", + }, + CAPath: "/validating-certs/ca.crt", + ConfigurationName: "shell-operator-hooks", + DefaultFailurePolicy: "Fail", + } +} + +// testConversionSettings replaces what used to be conversion.DefaultSettings. +func testConversionSettings() *conversion.WebhookSettings { + return &conversion.WebhookSettings{ + Settings: server.Settings{ + ServerCertPath: "/conversion-certs/tls.crt", + ServerKeyPath: "/conversion-certs/tls.key", + ServiceName: "shell-operator-conversion-svc", + ListenAddr: "0.0.0.0", + ListenPort: "9681", + }, + CAPath: "/conversion-certs/ca.crt", + } +} diff --git a/pkg/kube_events_manager/kube_events_manager.go b/pkg/kube_events_manager/kube_events_manager.go index 5019c765..3a86ebcc 100644 --- a/pkg/kube_events_manager/kube_events_manager.go +++ b/pkg/kube_events_manager/kube_events_manager.go @@ -93,6 +93,15 @@ func (mgr *kubeEventsManager) WithMetricStorage(mstor metricsstorage.Storage) { // TODO use Context to stop informers func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error { mgr.logger.Debug("add kubernetes monitor", slog.String(pkg.LogKeyConfig, fmt.Sprintf("%+v", monitorConfig))) + + // Ensure MonitorConfig.Logger is set: resource_informer and + // namespace_informer access this field directly for error-path + // logging. Propagate here (rather than inside NewMonitor) so that + // NewMonitor does not silently mutate a caller-owned struct. + if monitorConfig.Logger == nil { + monitorConfig.Logger = mgr.logger.Named("monitor") + } + mon := NewMonitor( mgr.ctx, mgr.KubeClient, diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index a20d4452..91f6c58d 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -194,7 +194,7 @@ func (m *monitor) CreateInformers() error { if m.Config.NamespaceSelector != nil && m.Config.NamespaceSelector.LabelSelector != nil { logEntry.Debug("Create NamespaceInformer for namespace.labelSelector") - m.NamespaceInformer = NewNamespaceInformer(m.ctx, m.KubeClient, m.Config) + m.NamespaceInformer = NewNamespaceInformer(m.ctx, m.KubeClient, m.Config, m.logger) err := m.NamespaceInformer.createSharedInformer( func(nsName string) { // Added/Modified event: check, create and run informers for Ns diff --git a/pkg/kube_events_manager/namespace_informer.go b/pkg/kube_events_manager/namespace_informer.go index 2c205c0e..5d739566 100644 --- a/pkg/kube_events_manager/namespace_informer.go +++ b/pkg/kube_events_manager/namespace_informer.go @@ -37,9 +37,11 @@ type namespaceInformer struct { addFn func(string) delFn func(string) + + logger *log.Logger } -func NewNamespaceInformer(ctx context.Context, client *klient.Client, monitor *MonitorConfig) *namespaceInformer { +func NewNamespaceInformer(ctx context.Context, client *klient.Client, monitor *MonitorConfig, logger *log.Logger) *namespaceInformer { cctx, cancel := context.WithCancel(ctx) informer := &namespaceInformer{ @@ -48,6 +50,7 @@ func NewNamespaceInformer(ctx context.Context, client *klient.Client, monitor *M KubeClient: client, Monitor: monitor, ExistedObjects: make(map[string]bool), + logger: logger, } return informer } @@ -151,7 +154,7 @@ func (ni *namespaceInformer) start() { if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return ni.SharedInformer.HasSynced(), nil }); err != nil { - ni.Monitor.Logger.Error("Cache is not synced for informer", + ni.logger.Error("Cache is not synced for informer", slog.String(pkg.LogKeyDebugName, ni.Monitor.Metadata.DebugName)) } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 53aaab18..d27a0dc8 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -465,7 +465,7 @@ func (ei *resourceInformer) start() { errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler")) err := ei.factoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) if err != nil { - ei.Monitor.Logger.Error("cache is not synced for informer", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) + ei.logger.Error("cache is not synced for informer", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) return } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 28e3ac3d..abbaa9e5 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -27,7 +27,13 @@ import ( // Metric name variables organized by functional area. // Each variable represents a unique metric name used throughout shell-operator. -// These variables are initialized with prefix replacement at startup. +// These variables are initialized with prefix replacement at startup via +// InitMetrics. InitMetrics is idempotent for the same prefix because the +// substitution removes the {PREFIX} placeholder on the first call. +// +// Library callers that need a per-instance snapshot of the resolved names +// should use NewNames(prefix) and pass the returned *Names around explicitly, +// avoiding the package-level globals altogether. var ( // ============================================================================ // Common Metrics @@ -89,6 +95,76 @@ func ReplacePrefix(metricName, prefix string) string { return strings.ReplaceAll(metricName, "{PREFIX}", prefix) } +// Names is a per-instance snapshot of every metric name shell-operator emits, +// with the {PREFIX} placeholder already substituted. Library consumers that +// embed shell-operator may build one with NewNames(prefix) and reference its +// fields instead of the package-level variables, which avoids any reliance on +// the global InitMetrics call having run with a compatible prefix. +type Names struct { + LiveTicks string + TasksQueueActionDurationSeconds string + TasksQueueLength string + TasksQueueCompactionOperationsTotal string + TasksQueueCompactionTasksByHook string + HookEnableKubernetesBindingsSeconds string + HookEnableKubernetesBindingsErrorsTotal string + HookEnableKubernetesBindingsSuccess string + HookRunSeconds string + HookRunUserCPUSeconds string + HookRunSysCPUSeconds string + HookRunMaxRSSBytes string + HookRunErrorsTotal string + HookRunAllowedErrorsTotal string + HookRunSuccessTotal string + TaskWaitInQueueSecondsTotal string + KubeSnapshotObjects string + KubeJqFilterDurationSeconds string + KubeEventDurationSeconds string + KubernetesClientWatchErrorsTotal string +} + +// NewNames builds a *Names with prefix substituted into every metric template. +// Calling NewNames does not mutate any package-level state, so multiple +// instances with different prefixes can coexist when needed. +func NewNames(prefix string) *Names { + r := func(name string) string { return ReplacePrefix(name, prefix) } + + // Source the templates straight from the package vars but force them back + // to the {PREFIX} form so the resolved prefix wins, regardless of any + // previous InitMetrics call. We do this by re-resolving against the known + // suffix, defaulting to the current var value when there's no placeholder + // match left to recover. + tpl := func(current, suffix string) string { + if strings.Contains(current, "{PREFIX}") { + return r(current) + } + return prefix + suffix + } + + return &Names{ + LiveTicks: tpl(LiveTicks, "live_ticks"), + TasksQueueActionDurationSeconds: tpl(TasksQueueActionDurationSeconds, "tasks_queue_action_duration_seconds"), + TasksQueueLength: tpl(TasksQueueLength, "tasks_queue_length"), + TasksQueueCompactionOperationsTotal: "d8_telemetry_" + prefix + "tasks_queue_compaction_operations_total", + TasksQueueCompactionTasksByHook: "d8_telemetry_" + prefix + "tasks_queue_compaction_tasks_by_hook", + HookEnableKubernetesBindingsSeconds: tpl(HookEnableKubernetesBindingsSeconds, "hook_enable_kubernetes_bindings_seconds"), + HookEnableKubernetesBindingsErrorsTotal: tpl(HookEnableKubernetesBindingsErrorsTotal, "hook_enable_kubernetes_bindings_errors_total"), + HookEnableKubernetesBindingsSuccess: tpl(HookEnableKubernetesBindingsSuccess, "hook_enable_kubernetes_bindings_success"), + HookRunSeconds: tpl(HookRunSeconds, "hook_run_seconds"), + HookRunUserCPUSeconds: tpl(HookRunUserCPUSeconds, "hook_run_user_cpu_seconds"), + HookRunSysCPUSeconds: tpl(HookRunSysCPUSeconds, "hook_run_sys_cpu_seconds"), + HookRunMaxRSSBytes: tpl(HookRunMaxRSSBytes, "hook_run_max_rss_bytes"), + HookRunErrorsTotal: tpl(HookRunErrorsTotal, "hook_run_errors_total"), + HookRunAllowedErrorsTotal: tpl(HookRunAllowedErrorsTotal, "hook_run_allowed_errors_total"), + HookRunSuccessTotal: tpl(HookRunSuccessTotal, "hook_run_success_total"), + TaskWaitInQueueSecondsTotal: tpl(TaskWaitInQueueSecondsTotal, "task_wait_in_queue_seconds_total"), + KubeSnapshotObjects: tpl(KubeSnapshotObjects, "kube_snapshot_objects"), + KubeJqFilterDurationSeconds: tpl(KubeJqFilterDurationSeconds, "kube_jq_filter_duration_seconds"), + KubeEventDurationSeconds: tpl(KubeEventDurationSeconds, "kube_event_duration_seconds"), + KubernetesClientWatchErrorsTotal: tpl(KubernetesClientWatchErrorsTotal, "kubernetes_client_watch_errors_total"), + } +} + // InitMetrics initializes all metric name variables by replacing {PREFIX} placeholders // with the provided prefix. This function should be called once at startup before // registering any metrics. diff --git a/pkg/schedule_manager/schedule_manager.go b/pkg/schedule_manager/schedule_manager.go index 826827cd..9432a0dd 100644 --- a/pkg/schedule_manager/schedule_manager.go +++ b/pkg/schedule_manager/schedule_manager.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "sync" + "time" "github.com/deckhouse/deckhouse/pkg/log" "gopkg.in/robfig/cron.v2" @@ -44,6 +45,8 @@ type scheduleManager struct { ScheduleCh chan string Entries map[string]*CronEntry + stopped chan struct{} + logger *log.Logger mu sync.Mutex } @@ -64,10 +67,23 @@ func NewScheduleManager(ctx context.Context, logger *log.Logger) *scheduleManage return sm } +// Stop cancels the manager context and blocks until the cron is fully stopped. +// Calling Stop without a preceding Start returns immediately. Calling Stop +// multiple times is safe. func (sm *scheduleManager) Stop() { if sm.cancel != nil { sm.cancel() } + sm.mu.Lock() + stopped := sm.stopped + sm.mu.Unlock() + if stopped == nil { + return + } + select { + case <-stopped: + case <-time.After(5 * time.Second): + } } // Add create entry for crontab and id and start scheduled function. @@ -142,9 +158,21 @@ func (sm *scheduleManager) Remove(delEntry smtypes.ScheduleEntry) { } func (sm *scheduleManager) Start() { + sm.mu.Lock() + if sm.stopped != nil { + sm.mu.Unlock() + return + } + stopped := make(chan struct{}) + sm.stopped = stopped + sm.mu.Unlock() + sm.cron.Start() go func() { + defer close(stopped) <-sm.ctx.Done() + // cron.v2 Stop signals the scheduler goroutine to exit. + // Closing stopped lets Stop() callers observe completion. sm.cron.Stop() }() } diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index eb2b4946..48bbb519 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/deckhouse/deckhouse/pkg/log" - metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/config" @@ -22,96 +21,114 @@ import ( webhookserver "github.com/flant/shell-operator/pkg/webhook/server" ) -// Init initializes logging, ensures directories and creates -// a ShellOperator instance with all dependencies. -// cfg must already have all configuration sources merged (NewConfig → ParseEnv → BindFlags → flags parsed). -func Init(ctx context.Context, cfg *app.Config, logger *log.Logger) (*ShellOperator, error) { - // Propagate cfg into package-level globals (e.g. DebugUnixSocket) so - // library consumers that skip BindFlags still get them overridden from cfg. - app.ApplyConfig(cfg) +// NewShellOperator builds a fully assembled, ready-to-Start operator from cfg. +// It supersedes the previous Init + NewShellOperator pair: it owns directory +// validation, metric storage construction, logging setup, debug server, kube +// clients, hook discovery and webhook initialization. +// +// The returned operator is in the "ready" state — call Start to begin +// processing and Shutdown to tear everything down. After Shutdown returns, +// build a fresh instance via NewShellOperator to start over; *ShellOperator +// values are not meant to be reused across Stop/Start cycles. +func NewShellOperator(ctx context.Context, cfg *app.Config, opts ...Option) (*ShellOperator, error) { + if cfg == nil { + return nil, fmt.Errorf("shell-operator: cfg must not be nil") + } - // Initialize webhook settings from merged configuration. - admission.InitFromSettings(admission.WebhookSettings{ - Settings: webhookserver.Settings{ - ServerCertPath: cfg.Admission.ServerCert, - ServerKeyPath: cfg.Admission.ServerKey, - ClientCAPaths: cfg.Admission.ClientCA, - ServiceName: cfg.Admission.ServiceName, - ListenAddr: cfg.Admission.ListenAddress, - ListenPort: cfg.Admission.ListenPort, - }, - CAPath: cfg.Admission.CA, - ConfigurationName: cfg.Admission.ConfigurationName, - DefaultFailurePolicy: cfg.Admission.FailurePolicy, - }) - conversion.InitFromSettings(conversion.WebhookSettings{ - Settings: webhookserver.Settings{ - ServerCertPath: cfg.Conversion.ServerCert, - ServerKeyPath: cfg.Conversion.ServerKey, - ClientCAPaths: cfg.Conversion.ClientCA, - ServiceName: cfg.Conversion.ServiceName, - ListenAddr: cfg.Conversion.ListenAddress, - ListenPort: cfg.Conversion.ListenPort, - }, - CAPath: cfg.Conversion.CA, - }) + bare := newBareShellOperator(ctx, opts...) + + runtimeConfig := config.NewConfig(bare.logger) + app.SetupLogging(cfg.Log.Level, runtimeConfig, bare.logger) - runtimeConfig := config.NewConfig(logger) - // Init logging subsystem. - app.SetupLogging(cfg.Log.Level, runtimeConfig, logger) + // Ensure metric names use cfg's prefix. metrics.InitMetrics is idempotent + // for the same prefix (the {PREFIX} placeholder is replaced once) so + // re-instantiating with the same prefix is safe. We also build a + // per-instance *metrics.Names snapshot so library consumers have a + // stable accessor that does not depend on the package-level globals. + metrics.InitMetrics(cfg.App.PrometheusMetricsPrefix) + bare.MetricNames = metrics.NewNames(cfg.App.PrometheusMetricsPrefix) - // Log version and jq filtering implementation. - logger.Info(app.AppStartMessage) + bare.logger.Info(fmt.Sprintf("%s %s", app.AppName, app.Version)) fl := jq.NewFilter() - logger.Debug(fl.FilterInfo()) + bare.logger.Debug(fl.FilterInfo()) hooksDir, err := utils.RequireExistingDirectory(cfg.App.HooksDir) if err != nil { - logger.Log(ctx, log.LevelFatal.Level(), "hooks directory is required", log.Err(err)) - return nil, err + return nil, fmt.Errorf("hooks directory: %w", err) } tempDir, err := utils.EnsureTempDirectory(cfg.App.TempDir) if err != nil { - logger.Log(ctx, log.LevelFatal.Level(), "temp directory", log.Err(err)) - return nil, err + return nil, fmt.Errorf("temp directory: %w", err) } - ms := metricsstorage.NewMetricStorage( - metricsstorage.WithLogger(logger.Named("metric-storage")), - ) - - hms := metricsstorage.NewMetricStorage( - metricsstorage.WithNewRegistry(), - metricsstorage.WithLogger(logger.Named("hook-metric-storage")), - ) - - op := NewShellOperator(ctx, ms, hms, WithLogger(logger)) - - // Debug server. - debugServer, err := RunDefaultDebugServer(cfg.Debug.UnixSocket, cfg.Debug.HTTPServerAddr, op.logger.Named("debug-server")) + // Debug server is started immediately so a crash before Start still + // leaves diagnostics reachable; Shutdown tears it down. + debugServer, err := RunDefaultDebugServer(cfg.Debug.UnixSocket, cfg.Debug.HTTPServerAddr, bare.logger.Named("debug-server")) if err != nil { - logger.Log(ctx, log.LevelFatal.Level(), "start Debug server", log.Err(err)) - return nil, err + return nil, fmt.Errorf("start debug server: %w", err) } + bare.DebugServer = debugServer - err = op.AssembleCommonOperatorFromConfig(cfg, []string{ + if err := bare.AssembleCommonOperatorFromConfig(cfg, []string{ "hook", "binding", "queue", - }) - if err != nil { - logger.Log(ctx, log.LevelFatal.Level(), "essemble common operator", log.Err(err)) - return nil, err + }); err != nil { + // Best-effort cleanup of the just-started debug server. + _ = debugServer.Shutdown(ctx) + return nil, fmt.Errorf("assemble common operator: %w", err) } - err = op.assembleShellOperator(cfg, hooksDir, tempDir, debugServer, runtimeConfig) - if err != nil { - logger.Log(ctx, log.LevelFatal.Level(), "essemble shell operator", log.Err(err)) - return nil, err + if err := bare.assembleShellOperator(cfg, hooksDir, tempDir, debugServer, runtimeConfig); err != nil { + _ = debugServer.Shutdown(ctx) + return nil, fmt.Errorf("assemble shell operator: %w", err) } - return op, nil + return bare, nil +} + +// Init is a thin wrapper kept for compatibility with code that still calls +// the previous bootstrap entry point. New code should call NewShellOperator +// directly. +// +// Deprecated: use NewShellOperator(ctx, cfg, WithLogger(logger)) instead. +func Init(ctx context.Context, cfg *app.Config, logger *log.Logger) (*ShellOperator, error) { + return NewShellOperator(ctx, cfg, WithLogger(logger)) +} + +// admissionSettingsFromConfig builds a *admission.WebhookSettings directly +// from cfg without touching any package-level globals. +func admissionSettingsFromConfig(cfg *app.Config) *admission.WebhookSettings { + return &admission.WebhookSettings{ + Settings: webhookserver.Settings{ + ServerCertPath: cfg.Admission.ServerCert, + ServerKeyPath: cfg.Admission.ServerKey, + ClientCAPaths: cfg.Admission.ClientCA, + ServiceName: cfg.Admission.ServiceName, + ListenAddr: cfg.Admission.ListenAddress, + ListenPort: cfg.Admission.ListenPort, + }, + CAPath: cfg.Admission.CA, + ConfigurationName: cfg.Admission.ConfigurationName, + DefaultFailurePolicy: cfg.Admission.FailurePolicy, + } +} + +// conversionSettingsFromConfig builds a *conversion.WebhookSettings directly +// from cfg without touching any package-level globals. +func conversionSettingsFromConfig(cfg *app.Config) *conversion.WebhookSettings { + return &conversion.WebhookSettings{ + Settings: webhookserver.Settings{ + ServerCertPath: cfg.Conversion.ServerCert, + ServerKeyPath: cfg.Conversion.ServerKey, + ClientCAPaths: cfg.Conversion.ClientCA, + ServiceName: cfg.Conversion.ServiceName, + ListenAddr: cfg.Conversion.ListenAddress, + ListenPort: cfg.Conversion.ListenPort, + }, + CAPath: cfg.Conversion.CA, + } } // AssembleCommonOperatorFromConfig is the recommended assembly entry point for @@ -183,7 +200,7 @@ func kubeClientConfigsFromAppConfig(cfg *app.Config) (KubeClientConfig, KubeClie // For library consumers that already hold an *app.Config, prefer // AssembleCommonOperatorFromConfig instead of unpacking fields by hand. func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string, kubeEventsManagerLabels []string, mainKubeCfg, patcherKubeCfg KubeClientConfig) error { - op.APIServer = newBaseHTTPServer(listenAddress, listenPort) + op.APIServer = newBaseHTTPServer(listenAddress, listenPort, op.logger.Named("api-server")) // built-in metrics err := op.setupMetricStorage(kubeEventsManagerLabels) @@ -278,20 +295,19 @@ func (op *ShellOperator) SetupEventManagers() { op.ManagerEventsHandler = newManagerEventsHandler(op.ctx, cfg) } -// setupHookManagers instantiates different hook managers. +// setupHookManagers instantiates different hook managers. Settings for the +// admission and conversion webhook managers are derived directly from cfg — +// there are no package-level singletons to populate. func (op *ShellOperator) setupHookManagers(cfg *app.Config, hooksDir string, tempDir string) { - // Initialize admission webhooks manager. op.AdmissionWebhookManager = admission.NewWebhookManager(op.KubeClient, admission.WithLogger(op.logger.Named("admission-webhook-manager"))) - op.AdmissionWebhookManager.Settings = admission.DefaultSettings + op.AdmissionWebhookManager.Settings = admissionSettingsFromConfig(cfg) op.AdmissionWebhookManager.Namespace = cfg.App.Namespace - // Initialize conversion webhooks manager. op.ConversionWebhookManager = conversion.NewWebhookManager(conversion.WithLogger(op.logger.Named("conversion-webhook-manager"))) op.ConversionWebhookManager.KubeClient = op.KubeClient - op.ConversionWebhookManager.Settings = conversion.DefaultSettings + op.ConversionWebhookManager.Settings = conversionSettingsFromConfig(cfg) op.ConversionWebhookManager.Namespace = cfg.App.Namespace - // Initialize Hook manager. hookCfg := &hook.ManagerConfig{ WorkingDir: hooksDir, TempDir: tempDir, diff --git a/pkg/shell-operator/hook_task_factory.go b/pkg/shell-operator/hook_task_factory.go index e87eba5b..0ac6a4fb 100644 --- a/pkg/shell-operator/hook_task_factory.go +++ b/pkg/shell-operator/hook_task_factory.go @@ -68,11 +68,9 @@ func (HookTaskFactory) NewSyncHookRunTask(h *hook.Hook, info controller.BindingE WithCompactionID(h.Name) } -// globalHookTaskFactory is the package-level factory used by operator event handlers. -var globalHookTaskFactory HookTaskFactory - // newHookRunTaskNow is a convenience wrapper that also stamps WithQueuedAt(time.Now()). +// It uses a zero-value HookTaskFactory because the factory carries no state. func newHookRunTaskNow(hookName string, bindingType types.BindingType, info controller.BindingExecutionInfo, logLabels map[string]string) task.Task { - return globalHookTaskFactory.NewHookRunTask(hookName, bindingType, info, logLabels). + return HookTaskFactory{}.NewHookRunTask(hookName, bindingType, info, logLabels). WithQueuedAt(time.Now()) } diff --git a/pkg/shell-operator/http_server.go b/pkg/shell-operator/http_server.go index 126a4718..12f6da6d 100644 --- a/pkg/shell-operator/http_server.go +++ b/pkg/shell-operator/http_server.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "strings" + "sync" "time" "github.com/deckhouse/deckhouse/pkg/log" @@ -22,40 +23,83 @@ type baseHTTPServer struct { address string port string + + mu sync.Mutex + srv *http.Server + doneCh chan struct{} + logger *log.Logger } -// Start runs http server -func (bhs *baseHTTPServer) Start(ctx context.Context) { +// Start runs the http server in a background goroutine. It returns +// immediately. Lifecycle errors (failure to listen, etc.) are reported +// asynchronously via the server logger; for graceful stop callers must use +// Shutdown(ctx). The ctx argument is retained for backwards compatibility but +// is not used for shutdown anymore — see Shutdown. +func (bhs *baseHTTPServer) Start(_ context.Context) { srv := &http.Server{ Addr: bhs.address + ":" + bhs.port, Handler: bhs.router, ReadTimeout: 90 * time.Second, WriteTimeout: 90 * time.Second, } + doneCh := make(chan struct{}) + + bhs.mu.Lock() + bhs.srv = srv + bhs.doneCh = doneCh + bhs.mu.Unlock() + + logger := bhs.serverLogger() go func() { + defer close(doneCh) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal("base http server listen", log.Err(err)) + logger.Error("base http server stopped with error", log.Err(err)) } }() - log.Info("base http server started", + + logger.Info("base http server started", slog.String(pkg.LogKeyAddress, bhs.address), slog.String(pkg.LogKeyPort, bhs.port)) +} - go func() { - <-ctx.Done() - log.Info("base http server stopped") - - cctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer func() { - // extra handling here - cancel() - }() +// Shutdown gracefully stops the http server. It is safe to call multiple times +// and when Start was never invoked — both return nil. +func (bhs *baseHTTPServer) Shutdown(ctx context.Context) error { + bhs.mu.Lock() + srv := bhs.srv + doneCh := bhs.doneCh + bhs.srv = nil + bhs.mu.Unlock() + + if srv == nil { + return nil + } - if err := srv.Shutdown(cctx); err != nil { - log.Fatal("base http server shutdown failed", log.Err(err)) + if err := srv.Shutdown(ctx); err != nil { + return fmt.Errorf("base http server shutdown: %w", err) + } + if doneCh != nil { + select { + case <-doneCh: + case <-ctx.Done(): + return ctx.Err() } - }() + } + bhs.serverLogger().Info("base http server stopped") + return nil +} + +// serverLogger returns the per-instance logger if set, falling back to the +// package default so older constructors keep working. +func (bhs *baseHTTPServer) serverLogger() *log.Logger { + bhs.mu.Lock() + l := bhs.logger + bhs.mu.Unlock() + if l != nil { + return l + } + return log.NewLogger() } // RegisterRoute register http.HandlerFunc @@ -75,16 +119,14 @@ func (bhs *baseHTTPServer) RegisterRoute(method, pattern string, h http.HandlerF } } -func newBaseHTTPServer(address, port string) *baseHTTPServer { +func newBaseHTTPServer(address, port string, logger *log.Logger) *baseHTTPServer { router := chi.NewRouter() - // inject pprof router.Mount("/debug", middleware.Profiler()) router.Get("/discovery", func(writer http.ResponseWriter, _ *http.Request) { buf := bytes.NewBuffer(nil) walkFn := func(method string, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error { - // skip pprof routes if strings.HasPrefix(route, "/debug/") { return nil } @@ -108,6 +150,7 @@ func newBaseHTTPServer(address, port string) *baseHTTPServer { router: router, address: address, port: port, + logger: logger, } return srv diff --git a/pkg/shell-operator/lifecycle_test.go b/pkg/shell-operator/lifecycle_test.go new file mode 100644 index 00000000..37891b09 --- /dev/null +++ b/pkg/shell-operator/lifecycle_test.go @@ -0,0 +1,200 @@ +// Copyright 2025 Flant JSC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shell_operator + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +// pickFreePort returns an OS-assigned free TCP port as a string. The listener +// is closed before the port is returned, so a brief race with another process +// is possible — acceptable for in-process restart tests. +func pickFreePort(t *testing.T) string { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + _, portStr, err := net.SplitHostPort(l.Addr().String()) + require.NoError(t, err) + if _, err := strconv.Atoi(portStr); err != nil { + t.Fatalf("unexpected port %q: %v", portStr, err) + } + return portStr +} + +// buildMinimalOperator builds a ShellOperator with just the APIServer wired up. +// All other components (HookManager, ManagerEventsHandler, KubeEventsManager, +// ScheduleManager, webhook managers, debug server) stay nil so we can exercise +// the lifecycle without a real Kubernetes cluster. +func buildMinimalOperator(t *testing.T, addr, port string) *ShellOperator { + t.Helper() + op := NewBareShellOperator(context.Background(), WithLogger(log.NewNop())) + op.APIServer = newBaseHTTPServer(addr, port, log.NewNop()) + op.APIServer.RegisterRoute(http.MethodGet, "/healthz", + func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + return op +} + +// waitForServer polls /healthz until the server answers or the deadline elapses. +func waitForServer(t *testing.T, port string, deadline time.Duration) { + t.Helper() + client := &http.Client{Timeout: 200 * time.Millisecond} + url := fmt.Sprintf("http://127.0.0.1:%s/healthz", port) + end := time.Now().Add(deadline) + for time.Now().Before(end) { + resp, err := client.Get(url) + if err == nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return + } + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("server on port %s never became ready", port) +} + +// TestShutdownStopsAllGoroutines verifies that Start spawns a known set of +// goroutines and Shutdown reliably joins them all. We snapshot goroutines +// before Start and ask goleak to verify the same baseline after Shutdown. +func TestShutdownStopsAllGoroutines(t *testing.T) { + defer goleak.VerifyNone(t, + // metric storage registers a prometheus collector that may keep an + // internal goroutine alive between tests; ignore the well-known + // background routines that are not owned by ShellOperator. + goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), + ) + + op := buildMinimalOperator(t, "127.0.0.1", pickFreePort(t)) + + require.NoError(t, op.Start(context.Background())) + waitForServer(t, op.APIServer.port, 2*time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, op.Shutdown(ctx)) +} + +// TestRestartInSameProcess starts op1, shuts it down, and then starts op2 on +// the same port. The second Start must succeed — proof that the first +// instance released its listener cleanly. +func TestRestartInSameProcess(t *testing.T) { + port := pickFreePort(t) + + op1 := buildMinimalOperator(t, "127.0.0.1", port) + require.NoError(t, op1.Start(context.Background())) + waitForServer(t, port, 2*time.Second) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, op1.Shutdown(shutdownCtx)) + + op2 := buildMinimalOperator(t, "127.0.0.1", port) + require.NoError(t, op2.Start(context.Background())) + waitForServer(t, port, 2*time.Second) + + shutdownCtx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + require.NoError(t, op2.Shutdown(shutdownCtx2)) +} + +// TestShutdownIdempotent verifies that calling Shutdown twice is safe and +// the second call is a no-op that returns the original error (nil here). +func TestShutdownIdempotent(t *testing.T) { + op := buildMinimalOperator(t, "127.0.0.1", pickFreePort(t)) + require.NoError(t, op.Start(context.Background())) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, op.Shutdown(ctx)) + + // Second call: shutdownOnce should suppress all the underlying work and + // just return the cached error. + require.NoError(t, op.Shutdown(ctx)) + + // And once shutdownOnce has fired, a third call from another goroutine + // must also be safe. + var wg sync.WaitGroup + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = op.Shutdown(ctx) + }() + } + wg.Wait() +} + +// blockingShutdowner pretends to be a debug server whose Shutdown never +// returns. It lets us prove that ShellOperator.Shutdown honors ctx deadlines +// instead of blocking forever on a misbehaving dependency. +type blockingShutdowner struct{} + +func (blockingShutdowner) Shutdown(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() +} + +// TestShutdownHonorsContextDeadline plugs in a DebugServer that blocks on +// ctx.Done and asserts that Shutdown still returns within the deadline, +// reporting the wrapped timeout error. +func TestShutdownHonorsContextDeadline(t *testing.T) { + op := buildMinimalOperator(t, "127.0.0.1", pickFreePort(t)) + op.DebugServer = blockingShutdowner{} + + require.NoError(t, op.Start(context.Background())) + + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + + start := time.Now() + err := op.Shutdown(ctx) + elapsed := time.Since(start) + + require.Error(t, err, "expected a timeout error") + assert.Contains(t, strings.ToLower(err.Error()), "debug server") + // We don't pin the bound tight because CI machines vary, but the call + // must not block for an order of magnitude beyond the deadline. + assert.Less(t, elapsed, 2*time.Second, "shutdown took too long: %s", elapsed) +} + +// TestStartIsIdempotent ensures Start can be called multiple times without +// double-binding the listener or spawning extra goroutines. The first call +// wins; subsequent calls return the cached error (nil here). +func TestStartIsIdempotent(t *testing.T) { + op := buildMinimalOperator(t, "127.0.0.1", pickFreePort(t)) + require.NoError(t, op.Start(context.Background())) + require.NoError(t, op.Start(context.Background())) + require.NoError(t, op.Start(context.Background())) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, op.Shutdown(ctx)) +} diff --git a/pkg/shell-operator/manager_events_handler.go b/pkg/shell-operator/manager_events_handler.go index 38f55c92..90c81aa1 100644 --- a/pkg/shell-operator/manager_events_handler.go +++ b/pkg/shell-operator/manager_events_handler.go @@ -2,6 +2,8 @@ package shell_operator import ( "context" + "sync" + "sync/atomic" "github.com/deckhouse/deckhouse/pkg/log" @@ -33,6 +35,10 @@ type ManagerEventsHandler struct { taskQueues *queue.TaskQueueSet + startOnce sync.Once + started atomic.Bool + doneCh chan struct{} + logger *log.Logger } @@ -45,6 +51,7 @@ func newManagerEventsHandler(ctx context.Context, cfg *managerEventsHandlerConfi scheduleManager: cfg.smgr, kubeEventsManager: cfg.mgr, taskQueues: cfg.tqs, + doneCh: make(chan struct{}), logger: cfg.logger, } } @@ -61,32 +68,58 @@ func (m *ManagerEventsHandler) WithScheduleEventHandler(fn func(ctx context.Cont m.scheduleCb = fn } -// Start runs events handler. This function is used in addon-operator +// Start runs the events handler. Calling Start a second time is a no-op; +// the original loop keeps running until Stop is called. func (m *ManagerEventsHandler) Start() { - go func() { - for { - var tailTasks []task.Task + m.startOnce.Do(func() { + m.started.Store(true) + go func() { + defer close(m.doneCh) logEntry := m.logger.With(pkg.LogKeyOperatorComponent, "handleEvents") - ctx := context.Background() + for { + var tailTasks []task.Task - select { - case crontab := <-m.scheduleManager.Ch(): - if m.scheduleCb != nil { - tailTasks = m.scheduleCb(ctx, crontab) - } + select { + case crontab := <-m.scheduleManager.Ch(): + if m.scheduleCb != nil { + tailTasks = m.scheduleCb(m.ctx, crontab) + } + + case kubeEvent := <-m.kubeEventsManager.Ch(): + if m.kubeEventCb != nil { + tailTasks = m.kubeEventCb(m.ctx, kubeEvent) + } - case kubeEvent := <-m.kubeEventsManager.Ch(): - if m.kubeEventCb != nil { - tailTasks = m.kubeEventCb(ctx, kubeEvent) + case <-m.ctx.Done(): + logEntry.Info("Stop") + return } - case <-m.ctx.Done(): - logEntry.Info("Stop") - return + m.taskQueues.AddTailTasks(tailTasks...) } + }() + }) +} - m.taskQueues.AddTailTasks(tailTasks...) - } - }() +// Stop signals the events handler loop to exit. Stop is non-blocking; pair it +// with Wait when you need a synchronous teardown. +func (m *ManagerEventsHandler) Stop() { + if m.cancel != nil { + m.cancel() + } +} + +// Wait blocks until the events handler loop has exited or ctx is canceled. +// When Start was never called, Wait returns immediately. +func (m *ManagerEventsHandler) Wait(ctx context.Context) error { + if !m.started.Load() { + return nil + } + select { + case <-m.doneCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index f31028f3..d68bda1a 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -16,8 +16,10 @@ package shell_operator import ( "context" + "errors" "fmt" "log/slog" + "sync" "time" "github.com/deckhouse/deckhouse/pkg/log" @@ -49,7 +51,9 @@ const ( serviceName = "shell-operator" ) -var WaitQueuesTimeout = time.Second * 10 +// DefaultShutdownTimeout bounds how long Shutdown waits for queues, informers +// and HTTP servers to drain when the caller's context has no deadline. +const DefaultShutdownTimeout = 30 * time.Second type ShellOperator struct { ctx context.Context @@ -80,9 +84,41 @@ type ShellOperator struct { AdmissionWebhookManager *admission.WebhookManager ConversionWebhookManager *conversion.WebhookManager + // DebugServer is optional. When non-nil, Shutdown stops it. The assembly + // path in Init() populates it with the default debug server. + DebugServer DebugShutdowner + // taskHandlerRegistry maps task types to their executor functions. // Extenders (addon-operator) may register additional task types. taskHandlerRegistry *TaskHandlerRegistry + + // taskFactory builds HookRun tasks. It carries no state and is exposed + // as a field so extenders can substitute a custom factory if needed. + taskFactory HookTaskFactory + + // MetricNames is a per-instance snapshot of resolved metric names. + // Library callers may consult it to avoid relying on the package-level + // metrics globals. NewShellOperator populates it from cfg's prefix; the + // bare constructor leaves it nil — callers can fill it later via + // metrics.NewNames(prefix) if needed. + MetricNames *metrics.Names + + // wg tracks goroutines spawned by Start (runMetrics, etc.) so Shutdown + // can wait for them to drain. + wg sync.WaitGroup + + // startOnce / shutdownOnce make Start and Shutdown idempotent. + startOnce sync.Once + startErr error + shutdownOnce sync.Once + shutdownErr error +} + +// DebugShutdowner is the subset of *debug.Server that ShellOperator needs at +// shutdown time. Using an interface avoids importing pkg/debug into operator.go +// and keeps the debug server optional. +type DebugShutdowner interface { + Shutdown(ctx context.Context) error } type Option func(operator *ShellOperator) @@ -93,7 +129,38 @@ func WithLogger(logger *log.Logger) Option { } } -func NewShellOperator(ctx context.Context, metricsStorage, hookMetricStorage metricsstorage.Storage, opts ...Option) *ShellOperator { +// WithMetricStorage overrides the default per-instance MetricStorage. Most +// callers should not need this; pass it only when sharing the registry with +// an outer program. +func WithMetricStorage(ms metricsstorage.Storage) Option { + return func(operator *ShellOperator) { + operator.MetricStorage = ms + } +} + +// WithHookMetricStorage overrides the default per-instance HookMetricStorage. +func WithHookMetricStorage(ms metricsstorage.Storage) Option { + return func(operator *ShellOperator) { + operator.HookMetricStorage = ms + } +} + +// NewBareShellOperator constructs an empty *ShellOperator without wiring any +// of the heavy components (HookManager, webhook managers, debug server, etc.). +// It is intended for tests and downstream tooling that need to assemble a +// subset of the operator manually; production code should call +// NewShellOperator(ctx, cfg, ...) instead. +// +// The returned operator has only context, logger, MetricStorage and +// HookMetricStorage initialized. All other fields are nil and the caller is +// responsible for populating them before Start. +func NewBareShellOperator(ctx context.Context, opts ...Option) *ShellOperator { + return newBareShellOperator(ctx, opts...) +} + +// newBareShellOperator is the internal implementation shared with the public +// constructor and the test-only NewBareShellOperator. +func newBareShellOperator(ctx context.Context, opts ...Option) *ShellOperator { cctx, cancel := context.WithCancel(ctx) so := &ShellOperator{ @@ -109,49 +176,179 @@ func NewShellOperator(ctx context.Context, metricsStorage, hookMetricStorage met so.logger = log.NewLogger().Named("shell-operator") } - so.MetricStorage = metricsStorage - so.HookMetricStorage = hookMetricStorage - if so.MetricStorage == nil { - so.logger.Warn("MetricStorage is not provided, create a new default one") - so.MetricStorage = metricsstorage.NewMetricStorage() + so.MetricStorage = metricsstorage.NewMetricStorage( + metricsstorage.WithLogger(so.logger.Named("metric-storage")), + ) } if so.HookMetricStorage == nil { - so.logger.Warn("HookMetricStorage is not provided, create a new default one") - so.HookMetricStorage = metricsstorage.NewMetricStorage(metricsstorage.WithNewRegistry()) + so.HookMetricStorage = metricsstorage.NewMetricStorage( + metricsstorage.WithNewRegistry(), + metricsstorage.WithLogger(so.logger.Named("hook-metric-storage")), + ) } return so } -// Start run the operator -func (op *ShellOperator) Start() { - op.logger.Info("start shell-operator") +// Start runs the operator: opens listeners, primes queues, spawns the metric +// and event goroutines. It is non-blocking. Calling Start twice is a no-op; +// the first call's error is returned to subsequent callers. +func (op *ShellOperator) Start(_ context.Context) error { + op.startOnce.Do(func() { + op.logger.Info("start shell-operator") + + if op.APIServer != nil { + op.APIServer.Start(op.ctx) + } + + if op.TaskQueues != nil { + op.bootstrapMainQueue(op.TaskQueues) + op.TaskQueues.StartMain(op.ctx) + op.initAndStartHookQueues() + } + + // Start emit "live" metrics. Goroutines tracked via op.wg so Shutdown + // can wait for them. + op.runMetrics() - op.APIServer.Start(op.ctx) + // Managers generate events. Start the demux loop before informers so + // no early Kubernetes event is dropped (#42). + if op.ManagerEventsHandler != nil { + op.ManagerEventsHandler.Start() + } - // Create 'main' queue and add onStartup tasks and enable bindings tasks. - op.bootstrapMainQueue(op.TaskQueues) - // Start main task queue handler - op.TaskQueues.StartMain(op.ctx) - op.initAndStartHookQueues() + if op.ScheduleManager != nil { + op.ScheduleManager.Start() + } + }) + return op.startErr +} - // Start emit "live" metrics - op.runMetrics() +// Run is a convenience that Start()s the operator, blocks until ctx is +// canceled, then performs a synchronous Shutdown with a derived timeout. Use +// it from main() and tests; library consumers that want fine-grained control +// should call Start and Shutdown directly. +func (op *ShellOperator) Run(ctx context.Context) error { + if err := op.Start(ctx); err != nil { + return err + } + <-ctx.Done() - // Managers are generating events. This go-routine handles all events and converts them into queued tasks. - // Start it before start all informers to catch all kubernetes events (#42) - op.ManagerEventsHandler.Start() + shutdownCtx, cancel := context.WithTimeout(context.Background(), DefaultShutdownTimeout) + defer cancel() + return op.Shutdown(shutdownCtx) +} - // Unlike KubeEventsManager, ScheduleManager has one go-routine. - op.ScheduleManager.Start() +// Shutdown gracefully tears down everything started by Start. It is +// synchronous: when Shutdown returns, no goroutine spawned by the operator is +// still running and every TCP port / unix socket / file descriptor opened +// during Start has been released. Subsequent calls return the original +// shutdown error. +// +// The supplied ctx bounds how long Shutdown waits for in-flight queues, +// informers and HTTP servers; if it has no deadline DefaultShutdownTimeout +// is applied automatically. +func (op *ShellOperator) Shutdown(ctx context.Context) error { + op.shutdownOnce.Do(func() { + op.shutdownErr = op.doShutdown(ctx) + }) + return op.shutdownErr } -func (op *ShellOperator) Stop() { +func (op *ShellOperator) doShutdown(ctx context.Context) error { + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, DefaultShutdownTimeout) + defer cancel() + } + + op.logger.Info("shutdown begin", slog.String(pkg.LogKeyPhase, "shutdown")) + + // Cancel the operator-scoped context first. This signals every + // ctx-aware goroutine (event handler loop, queue workers, schedule + // helper) to start unwinding. if op.cancel != nil { op.cancel() } + + var errs []error + + if op.ScheduleManager != nil { + op.ScheduleManager.Stop() + op.logger.Info("schedule manager stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + } + + if op.ManagerEventsHandler != nil { + op.ManagerEventsHandler.Stop() + if err := op.ManagerEventsHandler.Wait(ctx); err != nil { + errs = append(errs, fmt.Errorf("manager events handler: %w", err)) + } + op.logger.Info("manager events handler stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + } + + if op.KubeEventsManager != nil { + op.KubeEventsManager.Stop() + op.logger.Info("waiting informers", slog.String(pkg.LogKeyPhase, "shutdown")) + op.KubeEventsManager.Wait() + op.logger.Info("informers stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + } + + if op.TaskQueues != nil { + if err := op.TaskQueues.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("task queues: %w", err)) + } else { + op.logger.Info("task queues stopped", slog.String(pkg.LogKeyPhase, "shutdown")) + } + } + + if op.AdmissionWebhookManager != nil { + if err := op.AdmissionWebhookManager.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("admission webhook: %w", err)) + } + } + + if op.ConversionWebhookManager != nil { + if err := op.ConversionWebhookManager.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("conversion webhook: %w", err)) + } + } + + if op.DebugServer != nil { + if err := op.DebugServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("debug server: %w", err)) + } + } + + if op.APIServer != nil { + if err := op.APIServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("api server: %w", err)) + } + } + + // Wait for runMetrics and other tracked goroutines. + if err := waitWG(ctx, &op.wg); err != nil { + errs = append(errs, fmt.Errorf("operator goroutines: %w", err)) + } + + op.logger.Info("shutdown done", slog.String(pkg.LogKeyPhase, "shutdown")) + return errors.Join(errs...) +} + +// waitWG blocks until wg becomes zero or ctx is canceled. +func waitWG(ctx context.Context, wg *sync.WaitGroup) error { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // initHookManager load hooks from HooksDir and defines event handlers that emit tasks. @@ -227,10 +424,10 @@ func (op *ShellOperator) initValidatingWebhookManager() error { } // Delegate to the dedicated named type instead of an inlined closure. - admissionHandler := NewAdmissionEventHandler(op.HookManager, op.taskHandler, op.logger) + admissionHandler := NewAdmissionEventHandler(op.HookManager, op.taskHandler, op.taskFactory, op.logger) op.AdmissionWebhookManager.WithAdmissionEventHandler(admissionHandler.Handle) - if err := op.AdmissionWebhookManager.Start(); err != nil { + if err := op.AdmissionWebhookManager.Start(op.ctx); err != nil { return fmt.Errorf("ValidatingWebhookManager start: %w", err) } @@ -250,7 +447,7 @@ func (op *ShellOperator) initConversionWebhookManager() error { } // Assign the dedicated handler type instead of an inlined method. - conversionHandler := NewConversionEventHandler(op.HookManager, op.taskHandler, op.logger) + conversionHandler := NewConversionEventHandler(op.HookManager, op.taskHandler, op.taskFactory, op.logger) op.ConversionWebhookManager.EventHandlerFn = conversionHandler.Handle err := op.ConversionWebhookManager.Init() @@ -373,7 +570,7 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, // Run hook for each binding with Synchronization binding context. Ignore queue name here, execute in main queue. err := taskHook.HookController.HandleEnableKubernetesBindings(ctx, func(info controller.BindingExecutionInfo) { - hookRunTasks = append(hookRunTasks, globalHookTaskFactory.NewSyncHookRunTask(taskHook, info, hookLogLabels)) + hookRunTasks = append(hookRunTasks, op.taskFactory.NewSyncHookRunTask(taskHook, info, hookLogLabels)) }) success := 0.0 @@ -422,9 +619,10 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que return queue.TaskResult{Status: queue.Fail} } - err := taskHook.RateLimitWait(context.Background()) + err := taskHook.RateLimitWait(ctx) if err != nil { - // This could happen when the Context is canceled, so just repeat the task until the queue is stopped. + // Context cancellation (e.g. shutdown) lands here; repeat so a future + // task handler tick can decide whether to keep going. return queue.TaskResult{ Status: queue.Repeat, } @@ -792,30 +990,53 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { } } +// runMetrics spawns two long-running metric goroutines. Both honor op.ctx and +// are tracked by op.wg so Shutdown waits for them to finish. func (op *ShellOperator) runMetrics() { if op.MetricStorage == nil { return } - // live ticks. - go func() { - for { + op.wg.Add(1) + go op.runLiveTicks() + + if op.TaskQueues != nil { + op.wg.Add(1) + go op.runQueueLengthMetric() + } +} + +func (op *ShellOperator) runLiveTicks() { + defer op.wg.Done() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + // Emit one tick immediately so the metric appears before the first interval. + op.MetricStorage.CounterAdd(metrics.LiveTicks, 1.0, map[string]string{}) + for { + select { + case <-op.ctx.Done(): + return + case <-ticker.C: op.MetricStorage.CounterAdd(metrics.LiveTicks, 1.0, map[string]string{}) - time.Sleep(10 * time.Second) } - }() + } +} - // task queue length - go func() { - for { - op.TaskQueues.IterateSnapshot(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) { - queueLen := float64(queue.Length()) - op.MetricStorage.GaugeSet(metrics.TasksQueueLength, queueLen, map[string]string{pkg.MetricKeyQueue: queue.Name}) +func (op *ShellOperator) runQueueLengthMetric() { + defer op.wg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-op.ctx.Done(): + return + case <-ticker.C: + op.TaskQueues.IterateSnapshot(op.ctx, func(_ context.Context, q *queue.TaskQueue) { + op.MetricStorage.GaugeSet(metrics.TasksQueueLength, float64(q.Length()), + map[string]string{pkg.MetricKeyQueue: q.Name}) }) - - time.Sleep(5 * time.Second) } - }() + } } // initAndStartHookQueues create all queues defined in hooks @@ -850,21 +1071,3 @@ func (op *ShellOperator) initAndStartHookQueues() { } } } - -// Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. -func (op *ShellOperator) Shutdown() { - op.logger.Info("shutdown begin", slog.String(pkg.LogKeyPhase, "shutdown")) - op.ScheduleManager.Stop() - op.logger.Info("schedule manager stopped", slog.String(pkg.LogKeyPhase, "shutdown")) - - op.KubeEventsManager.Stop() - op.logger.Info("waiting informers", slog.String(pkg.LogKeyPhase, "shutdown")) - op.KubeEventsManager.Wait() - op.logger.Info("informers stopped", slog.String(pkg.LogKeyPhase, "shutdown")) - - op.TaskQueues.Stop() - op.logger.Info("waiting task queues", slog.String(pkg.LogKeyPhase, "shutdown")) - // Wait for queues to stop, but no more than 10 seconds - op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) - op.logger.Info("task queues stopped", slog.String(pkg.LogKeyPhase, "shutdown")) -} diff --git a/pkg/shell-operator/operator_test.go b/pkg/shell-operator/operator_test.go index e4e05949..c72b473f 100644 --- a/pkg/shell-operator/operator_test.go +++ b/pkg/shell-operator/operator_test.go @@ -36,8 +36,13 @@ func Test_Operator_startup_tasks(t *testing.T) { metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) - op := NewShellOperator(context.Background(), nil, nil, WithLogger(log.NewNop())) - op.MetricStorage = metricStorage + // newBareShellOperator is the test-only constructor that builds an empty + // operator without performing directory validation or starting any + // servers. Production callers should use NewShellOperator(ctx, cfg, ...). + op := newBareShellOperator(context.Background(), + WithLogger(log.NewNop()), + WithMetricStorage(metricStorage), + ) op.SetupEventManagers() op.setupHookManagers(app.NewConfig(), hooksDir, "") diff --git a/pkg/shell-operator/webhook_handlers.go b/pkg/shell-operator/webhook_handlers.go index 4cd5d712..b1330ebe 100644 --- a/pkg/shell-operator/webhook_handlers.go +++ b/pkg/shell-operator/webhook_handlers.go @@ -43,14 +43,18 @@ type TaskRunner func(ctx context.Context, t task.Task) queue.TaskResult type AdmissionEventHandler struct { hookManager hook.HookManager taskRunner TaskRunner + taskFactory HookTaskFactory logger *log.Logger } -// NewAdmissionEventHandler creates a new AdmissionEventHandler. -func NewAdmissionEventHandler(hm hook.HookManager, runner TaskRunner, logger *log.Logger) *AdmissionEventHandler { +// NewAdmissionEventHandler creates a new AdmissionEventHandler. The factory +// argument is used to build HookRun tasks; pass HookTaskFactory{} to use the +// default zero-value implementation. +func NewAdmissionEventHandler(hm hook.HookManager, runner TaskRunner, factory HookTaskFactory, logger *log.Logger) *AdmissionEventHandler { return &AdmissionEventHandler{ hookManager: hm, taskRunner: runner, + taskFactory: factory, logger: logger, } } @@ -71,8 +75,9 @@ func (h *AdmissionEventHandler) Handle(ctx context.Context, event admission.Even logEntry.Debug("Handle event") var admissionTask task.Task - h.hookManager.HandleAdmissionEvent(ctx, event, func(h *hook.Hook, info controller.BindingExecutionInfo) { - admissionTask = globalHookTaskFactory.NewHookRunTask(h.Name, eventBindingType, info, logLabels) + factory := h.taskFactory + h.hookManager.HandleAdmissionEvent(ctx, event, func(hk *hook.Hook, info controller.BindingExecutionInfo) { + admissionTask = factory.NewHookRunTask(hk.Name, eventBindingType, info, logLabels) }) if admissionTask == nil { @@ -104,14 +109,18 @@ func (h *AdmissionEventHandler) Handle(ctx context.Context, event admission.Even type ConversionEventHandler struct { hookManager hook.HookManager taskRunner TaskRunner + taskFactory HookTaskFactory logger *log.Logger } -// NewConversionEventHandler creates a new ConversionEventHandler. -func NewConversionEventHandler(hm hook.HookManager, runner TaskRunner, logger *log.Logger) *ConversionEventHandler { +// NewConversionEventHandler creates a new ConversionEventHandler. The factory +// argument is used to build HookRun tasks; pass HookTaskFactory{} to use the +// default zero-value implementation. +func NewConversionEventHandler(hm hook.HookManager, runner TaskRunner, factory HookTaskFactory, logger *log.Logger) *ConversionEventHandler { return &ConversionEventHandler{ hookManager: hm, taskRunner: runner, + taskFactory: factory, logger: logger, } } @@ -145,10 +154,11 @@ func (h *ConversionEventHandler) Handle(ctx context.Context, crdName string, req slog.String(pkg.LogKeyName, rule.String()), slog.Any(pkg.LogKeyValue, convPath)) + factory := h.taskFactory for _, convRule := range convPath { var convTask task.Task h.hookManager.HandleConversionEvent(ctx, crdName, request, convRule, func(hk *hook.Hook, info controller.BindingExecutionInfo) { - convTask = globalHookTaskFactory.NewHookRunTask(hk.Name, types.KubernetesConversion, info, logLabels) + convTask = factory.NewHookRunTask(hk.Name, types.KubernetesConversion, info, logLabels) }) if convTask == nil { diff --git a/pkg/shell-operator/webhook_handlers_test.go b/pkg/shell-operator/webhook_handlers_test.go index 760aa728..b32ba330 100644 --- a/pkg/shell-operator/webhook_handlers_test.go +++ b/pkg/shell-operator/webhook_handlers_test.go @@ -97,7 +97,7 @@ func TestAdmissionEventHandler_Handle_noHookFound_returnsError(t *testing.T) { runner := func(_ context.Context, _ task.Task) queue.TaskResult { return queue.TaskResult{Status: queue.Success} } - h := NewAdmissionEventHandler(hm, runner, log.NewNop()) + h := NewAdmissionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) event := admission.Event{ WebhookId: "wh1", ConfigurationId: "cfg1", @@ -118,7 +118,7 @@ func TestAdmissionEventHandler_Handle_taskRunnerFail_returnsDenied(t *testing.T) runner := func(_ context.Context, _ task.Task) queue.TaskResult { return queue.TaskResult{Status: queue.Fail} } - h := NewAdmissionEventHandler(hm, runner, log.NewNop()) + h := NewAdmissionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) event := admission.Event{ WebhookId: "wh1", ConfigurationId: "cfg1", @@ -140,7 +140,7 @@ func TestAdmissionEventHandler_Handle_taskRunnerSuccess_returnsProp(t *testing.T }, } runner := (&stubTaskRunnerWithProp{propKey: "admissionResponse", propValue: want, status: queue.Success}).run - h := NewAdmissionEventHandler(hm, runner, log.NewNop()) + h := NewAdmissionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) event := admission.Event{ WebhookId: "wh1", ConfigurationId: "cfg1", @@ -159,7 +159,7 @@ func TestAdmissionEventHandler_Handle_badPropType_returnsError(t *testing.T) { }, } runner := (&stubTaskRunnerWithProp{propKey: "admissionResponse", propValue: "wrong-type", status: queue.Success}).run - h := NewAdmissionEventHandler(hm, runner, log.NewNop()) + h := NewAdmissionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) event := admission.Event{ WebhookId: "wh1", ConfigurationId: "cfg1", @@ -182,7 +182,7 @@ func TestConversionEventHandler_Handle_noConversionPath_returnsNilError(t *testi runner := func(_ context.Context, _ task.Task) queue.TaskResult { return queue.TaskResult{Status: queue.Success} } - h := NewConversionEventHandler(hm, runner, log.NewNop()) + h := NewConversionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) req := &v1.ConversionRequest{ DesiredAPIVersion: "v2", @@ -207,7 +207,7 @@ func TestConversionEventHandler_Handle_taskRunnerFail_returnsFailedResponse(t *t runner := func(_ context.Context, _ task.Task) queue.TaskResult { return queue.TaskResult{Status: queue.Fail} } - h := NewConversionEventHandler(hm, runner, log.NewNop()) + h := NewConversionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) req := &v1.ConversionRequest{ DesiredAPIVersion: "v2", @@ -233,7 +233,7 @@ func TestConversionEventHandler_Handle_noHookForPath_returnsError(t *testing.T) runner := func(_ context.Context, _ task.Task) queue.TaskResult { return queue.TaskResult{Status: queue.Success} } - h := NewConversionEventHandler(hm, runner, log.NewNop()) + h := NewConversionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) req := &v1.ConversionRequest{ DesiredAPIVersion: "v2", @@ -254,7 +254,7 @@ func TestConversionEventHandler_Handle_badPropType_returnsError(t *testing.T) { }, } runner := (&stubTaskRunnerWithProp{propKey: "conversionResponse", propValue: "wrong-type", status: queue.Success}).run - h := NewConversionEventHandler(hm, runner, log.NewNop()) + h := NewConversionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) req := &v1.ConversionRequest{ DesiredAPIVersion: "v2", @@ -282,7 +282,7 @@ func TestConversionEventHandler_Handle_success_returnsProp(t *testing.T) { t.SetProp("conversionResponse", want) return queue.TaskResult{Status: queue.Success} } - h := NewConversionEventHandler(hm, runner, log.NewNop()) + h := NewConversionEventHandler(hm, runner, HookTaskFactory{}, log.NewNop()) req := &v1.ConversionRequest{ DesiredAPIVersion: "v2", diff --git a/pkg/task/queue/queue_set.go b/pkg/task/queue/queue_set.go index 83b829f8..81b19bfb 100644 --- a/pkg/task/queue/queue_set.go +++ b/pkg/task/queue/queue_set.go @@ -242,3 +242,35 @@ func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration) { } } } + +// Shutdown stops every queue in the set and blocks until either all workers +// reach QueueStatusStop or ctx is canceled / deadlines. Returns ctx.Err() on +// timeout and nil on a clean stop. Safe to call once; calling it on an +// already-stopped set is a no-op. +func (tqs *TaskQueueSet) Shutdown(ctx context.Context) error { + if tqs.cancel != nil { + tqs.cancel() + } + + checkTick := time.NewTicker(50 * time.Millisecond) + defer checkTick.Stop() + + for { + stopped := true + for _, q := range tqs.Queues.List() { + if q.GetStatusType() != QueueStatusStop { + stopped = false + break + } + } + if stopped { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-checkTick.C: + } + } +} diff --git a/pkg/utils/retry/retry.go b/pkg/utils/retry/retry.go new file mode 100644 index 00000000..507ac55f --- /dev/null +++ b/pkg/utils/retry/retry.go @@ -0,0 +1,54 @@ +// Package retry provides context-aware retry-with-backoff utilities. +package retry + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/deckhouse/deckhouse/pkg/log" +) + +// Config controls the parameters for [WithBackoff]. +type Config struct { + // MaxRetries is the maximum number of retry attempts (not counting the + // initial call). The total number of calls to fn is MaxRetries+1. + MaxRetries int + // InitialBackoff is the delay before the first retry; it doubles on each + // consecutive failure up to MaxBackoff. + InitialBackoff time.Duration + // MaxBackoff caps the exponential backoff delay. + MaxBackoff time.Duration +} + +// WithBackoff retries fn with exponential backoff, honouring ctx +// cancellation during sleep intervals. The caller label is used in log +// messages. Returns the last error from fn if all attempts fail, or nil on +// success. If ctx is cancelled during a backoff sleep, ctx.Err() is returned +// immediately. +func WithBackoff(ctx context.Context, cfg Config, logger *log.Logger, caller string, fn func() error) error { + backoff := cfg.InitialBackoff + var lastErr error + for attempt := 0; attempt <= cfg.MaxRetries; attempt++ { + lastErr = fn() + if lastErr == nil { + return nil + } + if attempt < cfg.MaxRetries { + logger.Warn("retrying after failure", + slog.String("caller", caller), + slog.Int("attempt", attempt+1), + slog.Duration("backoff", backoff), + log.Err(lastErr)) + + select { + case <-time.After(backoff): + case <-ctx.Done(): + return fmt.Errorf("context cancelled during backoff: %w", ctx.Err()) + } + backoff = min(backoff*2, cfg.MaxBackoff) + } + } + return lastErr +} diff --git a/pkg/utils/retry/retry_test.go b/pkg/utils/retry/retry_test.go new file mode 100644 index 00000000..72e18b1a --- /dev/null +++ b/pkg/utils/retry/retry_test.go @@ -0,0 +1,80 @@ +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/stretchr/testify/assert" +) + +func TestRetryWithBackoff_Success(t *testing.T) { + cfg := Config{ + MaxRetries: 3, + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 50 * time.Millisecond, + } + callCount := 0 + fn := func() error { + callCount++ + if callCount < 2 { + return errors.New("transient") + } + return nil + } + + err := WithBackoff(context.Background(), cfg, log.NewNop(), "test", fn) + assert.NoError(t, err) + assert.Equal(t, 2, callCount) +} + +func TestRetryWithBackoff_Exhausted(t *testing.T) { + cfg := Config{ + MaxRetries: 2, + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 50 * time.Millisecond, + } + fn := func() error { return errors.New("fail") } + + err := WithBackoff(context.Background(), cfg, log.NewNop(), "test", fn) + assert.Error(t, err) + assert.EqualError(t, err, "fail") +} + +func TestRetryWithBackoff_ContextCancelled(t *testing.T) { + cfg := Config{ + MaxRetries: 5, + InitialBackoff: 200 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } + fn := func() error { return errors.New("fail") } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := WithBackoff(ctx, cfg, log.NewNop(), "test", fn) + assert.Error(t, err) + assert.True(t, errors.Is(err, context.DeadlineExceeded), "expected DeadlineExceeded, got: %v", err) +} + +func TestRetryWithBackoff_BackoffCapsAtMax(t *testing.T) { + cfg := Config{ + MaxRetries: 5, + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 20 * time.Millisecond, + } + attempts := 0 + fn := func() error { + attempts++ + if attempts == 4 { + return nil + } + return errors.New("fail") + } + + err := WithBackoff(context.Background(), cfg, log.NewNop(), "test", fn) + assert.NoError(t, err) + assert.Equal(t, 4, attempts) +} \ No newline at end of file diff --git a/pkg/webhook/admission/manager.go b/pkg/webhook/admission/manager.go index 7858541a..2b3f103d 100644 --- a/pkg/webhook/admission/manager.go +++ b/pkg/webhook/admission/manager.go @@ -1,6 +1,7 @@ package admission import ( + "context" "fmt" "os" @@ -138,21 +139,21 @@ func (m *WebhookManager) AddMutatingWebhook(config *MutatingWebhookConfig) { r.Set(config) } -func (m *WebhookManager) Start() error { +func (m *WebhookManager) Start(ctx context.Context) error { err := m.Server.Start() if err != nil { return fmt.Errorf("start webhook server: %w", err) } for _, r := range m.ValidatingResources { - err = r.Register() + err = r.Register(ctx) if err != nil { return fmt.Errorf("register validating webhook: %w", err) } } for _, r := range m.MutatingResources { - err = r.Register() + err = r.Register(ctx) if err != nil { return fmt.Errorf("register mutating webhook: %w", err) } @@ -160,3 +161,12 @@ func (m *WebhookManager) Start() error { return nil } + +// Shutdown stops the underlying webhook https server. Safe to call when the +// manager was never started. +func (m *WebhookManager) Shutdown(ctx context.Context) error { + if m.Server == nil { + return nil + } + return m.Server.Shutdown(ctx) +} diff --git a/pkg/webhook/admission/resource.go b/pkg/webhook/admission/resource.go index f6d6b8af..09ad4637 100644 --- a/pkg/webhook/admission/resource.go +++ b/pkg/webhook/admission/resource.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strings" + "time" "github.com/deckhouse/deckhouse/pkg/log" v1 "k8s.io/api/admissionregistration/v1" @@ -12,8 +13,30 @@ import ( klient "github.com/flant/kube-client/client" "github.com/flant/shell-operator/pkg" + "github.com/flant/shell-operator/pkg/utils/retry" ) +const ( + // submitMaxRetries is the number of retry attempts for webhook configuration + // registration against the API server. Brief API server unavailability (e.g. + // during rolling restarts or leader election) should not cause a fatal exit. + // + // This retry is intentionally scoped to API-server registration calls only. + // Do not wrap full webhook-manager bootstrap with another retry layer: server + // Start() is not idempotent and may leave running listeners/goroutines. + submitMaxRetries = 5 + // submitInitialBackoff is the initial delay between retries. It doubles on + // each consecutive failure up to submitMaxBackoff. + submitInitialBackoff = 2 * time.Second + submitMaxBackoff = 15 * time.Second +) + +var submitRetryConfig = retry.Config{ + MaxRetries: submitMaxRetries, + InitialBackoff: submitInitialBackoff, + MaxBackoff: submitMaxBackoff, +} + type WebhookResourceOptions struct { KubeClient *klient.Client Namespace string @@ -45,7 +68,7 @@ func (w *ValidatingWebhookResource) Get(id string) *ValidatingWebhookConfig { return w.hooks[id] } -func (w *ValidatingWebhookResource) Register() error { +func (w *ValidatingWebhookResource) Register(ctx context.Context) error { configuration := &v1.ValidatingWebhookConfiguration{ Webhooks: []v1.ValidatingWebhook{}, } @@ -71,7 +94,7 @@ func (w *ValidatingWebhookResource) Register() error { configuration.Webhooks = append(configuration.Webhooks, *webhook.ValidatingWebhook) } - return w.submit(configuration) + return w.submit(ctx, configuration) } func (w *ValidatingWebhookResource) Unregister() error { @@ -96,31 +119,31 @@ func createWebhookPath(webhook IWebhookConfig) *string { return &res } -func (w *ValidatingWebhookResource) submit(conf *v1.ValidatingWebhookConfiguration) error { - logger := w.logger.With(slog.String(pkg.LogKeyName, conf.Name)) +func (w *ValidatingWebhookResource) submit(ctx context.Context, conf *v1.ValidatingWebhookConfiguration) error { client := w.opts.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations() - listOpts := metav1.ListOptions{ FieldSelector: "metadata.name=" + conf.Name, } - list, err := client.List(context.TODO(), listOpts) - if err != nil { - return fmt.Errorf("list ValidatingWebhookConfiguration: %w", err) - } - if len(list.Items) == 0 { - _, err = client.Create(context.TODO(), conf, pkg.DefaultCreateOptions()) - if err != nil { - logger.Error("Create ValidatingWebhookConfiguration", log.Err(err)) - } - } else { - newConf := list.Items[0] - newConf.Webhooks = conf.Webhooks - _, err = client.Update(context.TODO(), &newConf, pkg.DefaultUpdateOptions()) - if err != nil { - logger.Error("Replace ValidatingWebhookConfiguration", log.Err(err)) - } - } - return nil + + return retry.WithBackoff(ctx, submitRetryConfig, w.logger.With(slog.String(pkg.LogKeyName, conf.Name)), + "ValidatingWebhookConfiguration/submit", func() error { + list, err := client.List(ctx, listOpts) + if err != nil { + return fmt.Errorf("list ValidatingWebhookConfiguration: %w", err) + } + if len(list.Items) == 0 { + if _, err := client.Create(ctx, conf, pkg.DefaultCreateOptions()); err != nil { + return fmt.Errorf("create ValidatingWebhookConfiguration: %w", err) + } + } else { + newConf := list.Items[0] + newConf.Webhooks = conf.Webhooks + if _, err := client.Update(ctx, &newConf, pkg.DefaultUpdateOptions()); err != nil { + return fmt.Errorf("update ValidatingWebhookConfiguration: %w", err) + } + } + return nil + }) } type MutatingWebhookResource struct { @@ -146,7 +169,7 @@ func (w *MutatingWebhookResource) Get(id string) *MutatingWebhookConfig { return w.hooks[id] } -func (w *MutatingWebhookResource) Register() error { +func (w *MutatingWebhookResource) Register(ctx context.Context) error { configuration := &v1.MutatingWebhookConfiguration{ Webhooks: []v1.MutatingWebhook{}, } @@ -172,7 +195,7 @@ func (w *MutatingWebhookResource) Register() error { configuration.Webhooks = append(configuration.Webhooks, *webhook.MutatingWebhook) } - return w.submit(configuration) + return w.submit(ctx, configuration) } func (w *MutatingWebhookResource) Unregister() error { @@ -180,29 +203,29 @@ func (w *MutatingWebhookResource) Unregister() error { Delete(context.TODO(), w.opts.ConfigurationName, metav1.DeleteOptions{}) } -func (w *MutatingWebhookResource) submit(conf *v1.MutatingWebhookConfiguration) error { - logger := w.logger.With(slog.String(pkg.LogKeyName, conf.Name)) +func (w *MutatingWebhookResource) submit(ctx context.Context, conf *v1.MutatingWebhookConfiguration) error { client := w.opts.KubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations() - listOpts := metav1.ListOptions{ FieldSelector: "metadata.name=" + conf.Name, } - list, err := client.List(context.TODO(), listOpts) - if err != nil { - return fmt.Errorf("list MutatingWebhookConfiguration: %w", err) - } - if len(list.Items) == 0 { - _, err = client.Create(context.TODO(), conf, pkg.DefaultCreateOptions()) - if err != nil { - logger.Error("Create MutatingWebhookConfiguration", log.Err(err)) - } - } else { - newConf := list.Items[0] - newConf.Webhooks = conf.Webhooks - _, err = client.Update(context.TODO(), &newConf, pkg.DefaultUpdateOptions()) - if err != nil { - logger.Error("Replace MutatingWebhookConfiguration", log.Err(err)) - } - } - return nil + + return retry.WithBackoff(ctx, submitRetryConfig, w.logger.With(slog.String(pkg.LogKeyName, conf.Name)), + "MutatingWebhookConfiguration/submit", func() error { + list, err := client.List(ctx, listOpts) + if err != nil { + return fmt.Errorf("list MutatingWebhookConfiguration: %w", err) + } + if len(list.Items) == 0 { + if _, err := client.Create(ctx, conf, pkg.DefaultCreateOptions()); err != nil { + return fmt.Errorf("create MutatingWebhookConfiguration: %w", err) + } + } else { + newConf := list.Items[0] + newConf.Webhooks = conf.Webhooks + if _, err := client.Update(ctx, &newConf, pkg.DefaultUpdateOptions()); err != nil { + return fmt.Errorf("update MutatingWebhookConfiguration: %w", err) + } + } + return nil + }) } diff --git a/pkg/webhook/admission/settings.go b/pkg/webhook/admission/settings.go index d26ce6e4..03784a93 100644 --- a/pkg/webhook/admission/settings.go +++ b/pkg/webhook/admission/settings.go @@ -4,6 +4,9 @@ import ( "github.com/flant/shell-operator/pkg/webhook/server" ) +// WebhookSettings holds the configuration for a validating-webhook server. +// Library consumers build a *WebhookSettings explicitly and assign it to the +// WebhookManager's Settings field — there is no package-level singleton. type WebhookSettings struct { server.Settings CAPath string @@ -11,26 +14,3 @@ type WebhookSettings struct { ConfigurationName string DefaultFailurePolicy string } - -// DefaultSettings holds the active validating-webhook server settings. -// It is populated by InitFromSettings during operator initialization. -// Tests may use it directly; in production always call InitFromSettings first. -var DefaultSettings = &WebhookSettings{ - Settings: server.Settings{ - ServerCertPath: "/validating-certs/tls.crt", - ServerKeyPath: "/validating-certs/tls.key", - ServiceName: "shell-operator-validating-svc", - ListenAddr: "0.0.0.0", - ListenPort: "9680", - }, - CAPath: "/validating-certs/ca.crt", - ConfigurationName: "shell-operator-hooks", - DefaultFailurePolicy: "Fail", -} - -// InitFromSettings replaces DefaultSettings with values derived from cfg. -// Call this once during operator initialization, after all configuration -// sources (env vars and CLI flags) have been merged into cfg. -func InitFromSettings(s WebhookSettings) { - DefaultSettings = &s -} diff --git a/pkg/webhook/conversion/manager.go b/pkg/webhook/conversion/manager.go index 83e8ce04..f5b5b8d2 100644 --- a/pkg/webhook/conversion/manager.go +++ b/pkg/webhook/conversion/manager.go @@ -103,6 +103,15 @@ func (m *WebhookManager) Start() error { return nil } +// Shutdown stops the underlying webhook https server. Safe to call when the +// manager was never started. +func (m *WebhookManager) Shutdown(ctx context.Context) error { + if m.Server == nil { + return nil + } + return m.Server.Shutdown(ctx) +} + func (m *WebhookManager) AddWebhook(webhook *WebhookConfig) { if _, ok := m.ClientConfigs[webhook.CrdName]; !ok { m.ClientConfigs[webhook.CrdName] = &CrdClientConfig{ diff --git a/pkg/webhook/conversion/settings.go b/pkg/webhook/conversion/settings.go index 7dff5735..8488c8f9 100644 --- a/pkg/webhook/conversion/settings.go +++ b/pkg/webhook/conversion/settings.go @@ -4,29 +4,11 @@ import ( "github.com/flant/shell-operator/pkg/webhook/server" ) +// WebhookSettings holds the configuration for a conversion-webhook server. +// Library consumers build a *WebhookSettings explicitly and assign it to the +// WebhookManager's Settings field — there is no package-level singleton. type WebhookSettings struct { server.Settings CAPath string CABundle []byte } - -// DefaultSettings holds the active conversion-webhook server settings. -// It is populated by InitFromSettings during operator initialization. -// Tests may use it directly; in production always call InitFromSettings first. -var DefaultSettings = &WebhookSettings{ - Settings: server.Settings{ - ServerCertPath: "/conversion-certs/tls.crt", - ServerKeyPath: "/conversion-certs/tls.key", - ServiceName: "shell-operator-conversion-svc", - ListenAddr: "0.0.0.0", - ListenPort: "9681", - }, - CAPath: "/conversion-certs/ca.crt", -} - -// InitFromSettings replaces DefaultSettings with values derived from cfg. -// Call this once during operator initialization, after all configuration -// sources (env vars and CLI flags) have been merged into cfg. -func InitFromSettings(s WebhookSettings) { - DefaultSettings = &s -} diff --git a/pkg/webhook/server/server.go b/pkg/webhook/server/server.go index 999dc841..b2f30437 100644 --- a/pkg/webhook/server/server.go +++ b/pkg/webhook/server/server.go @@ -1,13 +1,16 @@ package server import ( + "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "log/slog" "net" "net/http" "os" + "sync" "time" "github.com/deckhouse/deckhouse/pkg/log" @@ -21,6 +24,11 @@ type WebhookServer struct { Namespace string Router chi.Router Logger *log.Logger + + mu sync.Mutex + srv *http.Server + listener net.Listener + doneCh chan struct{} } func NewWebhookServer(settings *Settings, namespace string, router chi.Router, logger *log.Logger) *WebhookServer { @@ -33,8 +41,10 @@ func NewWebhookServer(settings *Settings, namespace string, router chi.Router, l } // Start runs https server to listen for AdmissionReview requests from the API-server. +// Start does not block; the server runs in its own goroutine. A non-nil error +// means the server failed to bind or load TLS material. Once Start returns nil, +// the only way the server stops is via Shutdown. func (s *WebhookServer) Start() error { - // Load server certificate. keyPair, err := tls.LoadX509KeyPair( s.Settings.ServerCertPath, s.Settings.ServerKeyPath, @@ -43,7 +53,6 @@ func (s *WebhookServer) Start() error { return fmt.Errorf("load TLS certs: %v", err) } - // Construct a hostname for certificate. host := fmt.Sprintf("%s.%s", s.Settings.ServiceName, s.Namespace, @@ -54,7 +63,6 @@ func (s *WebhookServer) Start() error { ServerName: host, } - // Load client CA if defined if len(s.Settings.ClientCAPaths) > 0 { roots := x509.NewCertPool() @@ -75,7 +83,6 @@ func (s *WebhookServer) Start() error { } listenAddr := net.JoinHostPort(s.Settings.ListenAddr, s.Settings.ListenPort) - // Check if port is available listener, err := net.Listen("tcp", listenAddr) if err != nil { return fmt.Errorf("try listen on '%s': %v", listenAddr, err) @@ -92,15 +99,48 @@ func (s *WebhookServer) Start() error { ReadHeaderTimeout: timeout, } + doneCh := make(chan struct{}) + + s.mu.Lock() + s.srv = srv + s.listener = listener + s.doneCh = doneCh + s.mu.Unlock() + go func() { + defer close(doneCh) s.Logger.Info("Webhook server listens", slog.String(pkg.LogKeyAddress, listenAddr)) - err := srv.ServeTLS(listener, "", "") - if err != nil { - s.Logger.Error("Error starting Webhook https server", log.Err(err)) - // Stop process if server can't start. - os.Exit(1) + if err := srv.ServeTLS(listener, "", ""); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.Logger.Error("Webhook https server stopped with error", log.Err(err)) } }() return nil } + +// Shutdown gracefully stops the webhook https server. It is safe to call when +// the server was never started or has already been shut down — both return nil. +// The provided ctx bounds how long Shutdown waits for in-flight handlers. +func (s *WebhookServer) Shutdown(ctx context.Context) error { + s.mu.Lock() + srv := s.srv + doneCh := s.doneCh + s.srv = nil + s.mu.Unlock() + + if srv == nil { + return nil + } + + if err := srv.Shutdown(ctx); err != nil { + return fmt.Errorf("webhook https server shutdown: %w", err) + } + if doneCh != nil { + select { + case <-doneCh: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} diff --git a/test/hook/context/context_combiner.go b/test/hook/context/context_combiner.go index c7d0b107..8549efb6 100644 --- a/test/hook/context/context_combiner.go +++ b/test/hook/context/context_combiner.go @@ -35,11 +35,11 @@ func NewContextCombiner() *ContextCombiner { metricsstorage.WithLogger(log.NewNop()), ) - op := shell_operator.NewShellOperator( + op := shell_operator.NewBareShellOperator( context.Background(), - metricStorage, - metricStorage, shell_operator.WithLogger(log.NewNop()), + shell_operator.WithMetricStorage(metricStorage), + shell_operator.WithHookMetricStorage(metricStorage), ) op.TaskQueues = queue.NewTaskQueueSet().WithMetricStorage(metricStorage)