diff --git a/agent/config/config.go b/agent/config/config.go index c3e7516..f6461f1 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -99,7 +99,11 @@ func (ac AgentConfig) Print() { } fmt.Println("\tCortex API Base URL: ", ac.CortexApiBaseUrl) if ac.CortexApiToken != "" { - fmt.Printf("\tCortex API Token: %s...%s\n", ac.CortexApiToken[0:5], ac.CortexApiToken[len(ac.CortexApiToken)-5:]) + if len(ac.CortexApiToken) >= 10 { + fmt.Printf("\tCortex API Token: %s...%s\n", ac.CortexApiToken[0:5], ac.CortexApiToken[len(ac.CortexApiToken)-5:]) + } else { + fmt.Printf("\tCortex API Token: ***\n") + } } if ac.DryRun { fmt.Println("\tDry Run: Enabled") diff --git a/agent/server/handler/history_manager.go b/agent/server/handler/history_manager.go index 0feffe4..cd41e2d 100644 --- a/agent/server/handler/history_manager.go +++ b/agent/server/handler/history_manager.go @@ -110,8 +110,8 @@ func (s *historyManager) getHistoryDirectory() (string, error) { } func (s *historyManager) getHistoryPath(handlerName string, timestamp time.Time) string { - - return fmt.Sprintf("%s/%d-%s.json", s.config.HandlerHistoryPath, timestamp.UnixMilli(), handlerName) + safeName := filepath.Base(handlerName) + return fmt.Sprintf("%s/%d-%s.json", s.config.HandlerHistoryPath, timestamp.UnixMilli(), safeName) } func (s *historyManager) cleanupDirectory(path string, minTimestamp time.Time, maxSizeBytes int64, extractTimestamp func(info os.FileInfo) time.Time) (int, error) { diff --git a/agent/server/handler/manager.go b/agent/server/handler/manager.go index 55ad489..387e497 100644 --- a/agent/server/handler/manager.go +++ b/agent/server/handler/manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "sync" "time" pb "github.com/cortexapps/axon/.generated/proto/github.com/cortexapps/axon" @@ -27,6 +28,7 @@ type Manager interface { } type handlerManager struct { + mu sync.RWMutex logger *zap.Logger dispatchQueues map[string]chan Invocable outstandingRequests map[string]Invocable @@ -92,6 +94,8 @@ func (s *handlerManager) IsFinished() bool { } func (s *handlerManager) checkFinished() bool { + s.mu.RLock() + defer s.mu.RUnlock() for _, entry := range s.handlers { if !entry.IsFinished() { return false @@ -102,6 +106,9 @@ func (s *handlerManager) checkFinished() bool { func (s *handlerManager) RegisterHandler(dispatchId string, name string, timeout time.Duration, options ...*pb.HandlerOption) (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.finished { panic("handler manager has been closed") } @@ -152,45 +159,66 @@ func (s *handlerManager) createEntry( } func (s *handlerManager) UnregisterHandler(id string) { + s.mu.Lock() entry, ok := s.handlers[id] if !ok { + s.mu.Unlock() return } - s.removeHandler(entry) + s.removeHandlerLocked(entry) + s.mu.Unlock() } func (s *handlerManager) ClearHandlers(id string) { + s.mu.Lock() + var toClear []HandlerEntry for _, entry := range s.handlers { if entry.DispatchId() == id { - s.removeHandler(entry) + toClear = append(toClear, entry) } } + for _, entry := range toClear { + s.removeHandlerLocked(entry) + } + s.mu.Unlock() } func (s *handlerManager) Start(dispatchId string) error { + s.mu.RLock() + var toStart []HandlerEntry for _, entry := range s.handlers { if entry.DispatchId() == dispatchId { - err := entry.Start() - if err != nil { - return err - } + toStart = append(toStart, entry) + } + } + s.mu.RUnlock() + for _, entry := range toStart { + if err := entry.Start(); err != nil { + return err } } return nil } func (s *handlerManager) Stop(dispatchId string) error { + s.mu.RLock() + var toStop []HandlerEntry for _, entry := range s.handlers { if entry.DispatchId() == dispatchId { - entry.Close() + toStop = append(toStop, entry) } } + s.mu.RUnlock() + for _, entry := range toStop { + entry.Close() + } return nil } -func (s *handlerManager) removeHandler(entry HandlerEntry) { - entry.Close() +// removeHandlerLocked removes a handler while the caller already holds s.mu. +func (s *handlerManager) removeHandlerLocked(entry HandlerEntry) { delete(s.handlers, entry.Id()) + entry.Close() } func (s *handlerManager) Dequeue(ctx context.Context, dispatchId string, waitTime time.Duration) (Invocable, error) { @@ -209,7 +237,9 @@ func (s *handlerManager) Dequeue(ctx context.Context, dispatchId string, waitTim func (s *handlerManager) Trigger(handler Invocable) error { handlerName := handler.GetEntry().Name() + s.mu.RLock() entry := s.handlers[handler.GetEntry().Id()] + s.mu.RUnlock() if entry == nil { s.logger.Error("handler not found", zap.String("handler", handlerName)) @@ -261,7 +291,8 @@ func (s *handlerManager) Trigger(handler Invocable) error { } func (s *handlerManager) getDispatchQueue(DispatchId string) chan Invocable { - + s.mu.Lock() + defer s.mu.Unlock() queue, ok := s.dispatchQueues[DispatchId] if !ok { queue = make(chan Invocable, 100) @@ -271,6 +302,8 @@ func (s *handlerManager) getDispatchQueue(DispatchId string) chan Invocable { } func (s *handlerManager) ListHandlers() []HandlerEntry { + s.mu.RLock() + defer s.mu.RUnlock() var entries []HandlerEntry for _, entry := range s.handlers { entries = append(entries, entry) @@ -279,6 +312,8 @@ func (s *handlerManager) ListHandlers() []HandlerEntry { } func (s *handlerManager) GetByTag(tag string) HandlerEntry { + s.mu.RLock() + defer s.mu.RUnlock() for _, entry := range s.handlers { if entry.Tag() == tag && entry.IsActive() { return entry diff --git a/agent/server/snykbroker/reflector.go b/agent/server/snykbroker/reflector.go index e452f04..bfd4232 100644 --- a/agent/server/snykbroker/reflector.go +++ b/agent/server/snykbroker/reflector.go @@ -8,6 +8,7 @@ import ( "net/http/httputil" "net/url" "strings" + "sync" "sync/atomic" "time" @@ -24,6 +25,7 @@ type RegistrationReflector struct { logger *zap.Logger transport *http.Transport server cortexHttp.Server + mu sync.RWMutex targets map[string]proxyEntry serverStarted atomic.Bool mode config.RelayReflectorMode @@ -152,6 +154,9 @@ func (rr *RegistrationReflector) getProxy(targetURI string, isDefault bool, head key := newEntry.key() + rr.mu.Lock() + defer rr.mu.Unlock() + entry, exists := rr.targets[key] if !exists { entry = *newEntry @@ -188,6 +193,10 @@ func (rr *RegistrationReflector) parseTargetUri(proxyPath string) (*proxyEntry, remainder = path[slash:] } hash := rr.extractHash(beforeSlash) + + rr.mu.RLock() + defer rr.mu.RUnlock() + if hash == "" { // find the default proxy entry if entry, exists := rr.targets["default"]; exists { @@ -245,6 +254,9 @@ func (rr *RegistrationReflector) getUriForTarget(target string) (string, error) return "", fmt.Errorf("target URI cannot be empty") } + rr.mu.RLock() + defer rr.mu.RUnlock() + for _, entry := range rr.targets { if entry.TargetURI == target { return entry.proxyURI, nil @@ -289,7 +301,6 @@ func (rr *RegistrationReflector) ServeHTTP(w http.ResponseWriter, r *http.Reques if err != nil { rr.logger.Error("Failed to find Entry for target URI", zap.Error(err)) http.Error(w, "Invalid target URI", http.StatusBadGateway) - w.WriteHeader(http.StatusBadGateway) return } diff --git a/agent/server/snykbroker/relay_instance_manager.go b/agent/server/snykbroker/relay_instance_manager.go index a1efe34..c3fa22d 100644 --- a/agent/server/snykbroker/relay_instance_manager.go +++ b/agent/server/snykbroker/relay_instance_manager.go @@ -27,6 +27,15 @@ import ( const defaultSnykBroker = "snyk-broker" const brokerPort = 7343 +// maskToken returns a masked version of a token for safe logging. +// Shows first 4 and last 4 characters, e.g. "abcd...wxyz". +func maskToken(token string) string { + if len(token) <= 8 { + return "***" + } + return token[:4] + "..." + token[len(token)-4:] +} + // restartRequest is sent to the restart channel by any code path that // needs to restart the broker. The generation field ties the request // to the broker lifecycle that triggered it, so the consumer can @@ -187,8 +196,14 @@ func (r *relayInstanceManager) handleReregister(w http.ResponseWriter, req *http } if info.HasChanged { - r.Restart() + if err := r.Restart(); err != nil { + r.logger.Error("Unable to restart after reregistration", zap.Error(err)) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Unable to restart after reregistration")) + return + } } + w.WriteHeader(http.StatusOK) } func (r *relayInstanceManager) getSnykBrokerPort() int { @@ -212,16 +227,16 @@ func (r *relayInstanceManager) handleSystemCheck(w http.ResponseWriter, req *htt return } defer resp.Body.Close() - for k, v := range resp.Header { - w.Header().Set(k, strings.Join(v, ",")) - } - w.WriteHeader(resp.StatusCode) body, err := io.ReadAll(resp.Body) if err != nil { r.logger.Error("Unable to read system check response", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) return } + for k, v := range resp.Header { + w.Header().Set(k, strings.Join(v, ",")) + } + w.WriteHeader(resp.StatusCode) w.Write(body) } @@ -394,7 +409,7 @@ func (r *relayInstanceManager) getUrlAndToken() (*tokenInfo, error) { } if !tokenInfo.equals(r.tokenInfo) { - r.logger.Info("Registration info has changed", zap.String("uri", tokenInfo.ServerUri), zap.String("token", tokenInfo.Token)) + r.logger.Info("Registration info has changed", zap.String("uri", tokenInfo.ServerUri), zap.String("token", maskToken(tokenInfo.Token))) tokenInfo.HasChanged = true r.tokenInfo = tokenInfo } @@ -598,7 +613,7 @@ func (r *relayInstanceManager) Start() error { r.logger.Debug("Starting broker", zap.String("executable", executable), zap.Strings("args", args), - zap.String("token", info.Token), + zap.String("token", maskToken(info.Token)), zap.String("uri", info.ServerUri), zap.String("acceptFile", tmpAcceptFile), ) @@ -622,7 +637,7 @@ func (r *relayInstanceManager) Start() error { key := strings.TrimPrefix(parts[0], prefix) value := parts[1] brokerEnv[key] = value - r.logger.Debug("Adding SNYKBROKER_ environment variable", zap.String("key", key), zap.String("value", value)) + r.logger.Debug("Adding SNYKBROKER_ environment variable", zap.String("key", key)) } } diff --git a/agent/server/snykbroker/supervisor.go b/agent/server/snykbroker/supervisor.go index ff82fa5..d052db5 100644 --- a/agent/server/snykbroker/supervisor.go +++ b/agent/server/snykbroker/supervisor.go @@ -10,6 +10,7 @@ import ( "os/exec" "os/signal" "sync" + "sync/atomic" "syscall" "time" @@ -189,20 +190,23 @@ func (b *Supervisor) runCommand() error { // sigChan triggers shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigChan) running := make(chan struct{}) go func() { - <-sigChan - b.stopFunc() + sig, ok := <-sigChan + if ok && sig != nil { + b.stopFunc() + } }() - killed := false + var killed atomic.Bool // our stopfunc allows anyone to close the process // and wait for it to finish b.stopFunc = func() { - killed = true + killed.Store(true) // if process is running trigger a kill if cmd.Process != nil { @@ -244,7 +248,7 @@ func (b *Supervisor) runCommand() error { stopStdErr() wg.Wait() - if killed { + if killed.Load() { err = errKilled fmt.Printf("Process %v (pid=%v) killed\n", cmd.Path, pid) } @@ -266,20 +270,20 @@ func (b *Supervisor) Close() error { func (b *Supervisor) scanLines(reader io.Reader, output chan string, refCount *sync.WaitGroup) func() { - done := false + var done atomic.Bool refCount.Add(1) // increase buffer size from default of 60K to 1MB buffer := make([]byte, 1024*1024) go func() { - for !done { + for !done.Load() { scanner := bufio.NewScanner(reader) scanner.Buffer(buffer, cap(buffer)-1) for scanner.Scan() { ln := scanner.Text() output <- ln - if done { + if done.Load() { return } } @@ -302,7 +306,7 @@ func (b *Supervisor) scanLines(reader io.Reader, output chan string, refCount *s }() return func() { - done = true + done.Store(true) refCount.Done() } }