Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ func (ac AgentConfig) Print() {
}
fmt.Println("\tCortex API Base URL: ", ac.CortexApiBaseUrl)
if ac.CortexApiToken != "" {
fmt.Printf("\tCortex API Token: %s...%s\n", ac.CortexApiToken[0:5], ac.CortexApiToken[len(ac.CortexApiToken)-5:])
if len(ac.CortexApiToken) >= 10 {
fmt.Printf("\tCortex API Token: %s...%s\n", ac.CortexApiToken[0:5], ac.CortexApiToken[len(ac.CortexApiToken)-5:])
} else {
fmt.Printf("\tCortex API Token: ***\n")
}
}
if ac.DryRun {
fmt.Println("\tDry Run: Enabled")
Expand Down
4 changes: 2 additions & 2 deletions agent/server/handler/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (s *historyManager) getHistoryDirectory() (string, error) {
}

func (s *historyManager) getHistoryPath(handlerName string, timestamp time.Time) string {

return fmt.Sprintf("%s/%d-%s.json", s.config.HandlerHistoryPath, timestamp.UnixMilli(), handlerName)
safeName := filepath.Base(handlerName)
return fmt.Sprintf("%s/%d-%s.json", s.config.HandlerHistoryPath, timestamp.UnixMilli(), safeName)
}

func (s *historyManager) cleanupDirectory(path string, minTimestamp time.Time, maxSizeBytes int64, extractTimestamp func(info os.FileInfo) time.Time) (int, error) {
Expand Down
55 changes: 45 additions & 10 deletions agent/server/handler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"sync"
"time"

pb "github.com/cortexapps/axon/.generated/proto/github.com/cortexapps/axon"
Expand All @@ -27,6 +28,7 @@ type Manager interface {
}

type handlerManager struct {
mu sync.RWMutex
logger *zap.Logger
dispatchQueues map[string]chan Invocable
outstandingRequests map[string]Invocable
Expand Down Expand Up @@ -92,6 +94,8 @@ func (s *handlerManager) IsFinished() bool {
}

func (s *handlerManager) checkFinished() bool {
s.mu.RLock()
defer s.mu.RUnlock()
for _, entry := range s.handlers {
if !entry.IsFinished() {
return false
Expand All @@ -102,6 +106,9 @@ func (s *handlerManager) checkFinished() bool {

func (s *handlerManager) RegisterHandler(dispatchId string, name string, timeout time.Duration, options ...*pb.HandlerOption) (string, error) {

s.mu.Lock()
defer s.mu.Unlock()

if s.finished {
panic("handler manager has been closed")
}
Expand Down Expand Up @@ -152,45 +159,66 @@ func (s *handlerManager) createEntry(
}

func (s *handlerManager) UnregisterHandler(id string) {
s.mu.Lock()
entry, ok := s.handlers[id]
if !ok {
s.mu.Unlock()
return
}
s.removeHandler(entry)
s.removeHandlerLocked(entry)
s.mu.Unlock()
}

func (s *handlerManager) ClearHandlers(id string) {
s.mu.Lock()
var toClear []HandlerEntry
for _, entry := range s.handlers {
if entry.DispatchId() == id {
s.removeHandler(entry)
toClear = append(toClear, entry)
}
}
for _, entry := range toClear {
s.removeHandlerLocked(entry)
}
s.mu.Unlock()
}

func (s *handlerManager) Start(dispatchId string) error {
s.mu.RLock()
var toStart []HandlerEntry
for _, entry := range s.handlers {
if entry.DispatchId() == dispatchId {
err := entry.Start()
if err != nil {
return err
}
toStart = append(toStart, entry)
}
}
s.mu.RUnlock()
for _, entry := range toStart {
if err := entry.Start(); err != nil {
return err
}
}
return nil
}

func (s *handlerManager) Stop(dispatchId string) error {
s.mu.RLock()
var toStop []HandlerEntry
for _, entry := range s.handlers {
if entry.DispatchId() == dispatchId {
entry.Close()
toStop = append(toStop, entry)
}
}
s.mu.RUnlock()
for _, entry := range toStop {
entry.Close()
}
return nil
}

func (s *handlerManager) removeHandler(entry HandlerEntry) {
entry.Close()
// removeHandlerLocked removes a handler while the caller already holds s.mu.
func (s *handlerManager) removeHandlerLocked(entry HandlerEntry) {
delete(s.handlers, entry.Id())
entry.Close()
}

func (s *handlerManager) Dequeue(ctx context.Context, dispatchId string, waitTime time.Duration) (Invocable, error) {
Expand All @@ -209,7 +237,9 @@ func (s *handlerManager) Dequeue(ctx context.Context, dispatchId string, waitTim
func (s *handlerManager) Trigger(handler Invocable) error {

handlerName := handler.GetEntry().Name()
s.mu.RLock()
entry := s.handlers[handler.GetEntry().Id()]
s.mu.RUnlock()

if entry == nil {
s.logger.Error("handler not found", zap.String("handler", handlerName))
Expand Down Expand Up @@ -261,7 +291,8 @@ func (s *handlerManager) Trigger(handler Invocable) error {
}

func (s *handlerManager) getDispatchQueue(DispatchId string) chan Invocable {

s.mu.Lock()
defer s.mu.Unlock()
queue, ok := s.dispatchQueues[DispatchId]
if !ok {
queue = make(chan Invocable, 100)
Expand All @@ -271,6 +302,8 @@ func (s *handlerManager) getDispatchQueue(DispatchId string) chan Invocable {
}

func (s *handlerManager) ListHandlers() []HandlerEntry {
s.mu.RLock()
defer s.mu.RUnlock()
var entries []HandlerEntry
for _, entry := range s.handlers {
entries = append(entries, entry)
Expand All @@ -279,6 +312,8 @@ func (s *handlerManager) ListHandlers() []HandlerEntry {
}

func (s *handlerManager) GetByTag(tag string) HandlerEntry {
s.mu.RLock()
defer s.mu.RUnlock()
for _, entry := range s.handlers {
if entry.Tag() == tag && entry.IsActive() {
return entry
Expand Down
13 changes: 12 additions & 1 deletion agent/server/snykbroker/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httputil"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -24,6 +25,7 @@ type RegistrationReflector struct {
logger *zap.Logger
transport *http.Transport
server cortexHttp.Server
mu sync.RWMutex
targets map[string]proxyEntry
serverStarted atomic.Bool
mode config.RelayReflectorMode
Expand Down Expand Up @@ -152,6 +154,9 @@ func (rr *RegistrationReflector) getProxy(targetURI string, isDefault bool, head

key := newEntry.key()

rr.mu.Lock()
defer rr.mu.Unlock()

entry, exists := rr.targets[key]
if !exists {
entry = *newEntry
Expand Down Expand Up @@ -188,6 +193,10 @@ func (rr *RegistrationReflector) parseTargetUri(proxyPath string) (*proxyEntry,
remainder = path[slash:]
}
hash := rr.extractHash(beforeSlash)

rr.mu.RLock()
defer rr.mu.RUnlock()

if hash == "" {
// find the default proxy entry
if entry, exists := rr.targets["default"]; exists {
Expand Down Expand Up @@ -245,6 +254,9 @@ func (rr *RegistrationReflector) getUriForTarget(target string) (string, error)
return "", fmt.Errorf("target URI cannot be empty")
}

rr.mu.RLock()
defer rr.mu.RUnlock()

for _, entry := range rr.targets {
if entry.TargetURI == target {
return entry.proxyURI, nil
Expand Down Expand Up @@ -289,7 +301,6 @@ func (rr *RegistrationReflector) ServeHTTP(w http.ResponseWriter, r *http.Reques
if err != nil {
rr.logger.Error("Failed to find Entry for target URI", zap.Error(err))
http.Error(w, "Invalid target URI", http.StatusBadGateway)
w.WriteHeader(http.StatusBadGateway)
return
}

Expand Down
31 changes: 23 additions & 8 deletions agent/server/snykbroker/relay_instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import (
const defaultSnykBroker = "snyk-broker"
const brokerPort = 7343

// maskToken returns a masked version of a token for safe logging.
// Shows first 4 and last 4 characters, e.g. "abcd...wxyz".
func maskToken(token string) string {
if len(token) <= 8 {
return "***"
}
return token[:4] + "..." + token[len(token)-4:]
}

// restartRequest is sent to the restart channel by any code path that
// needs to restart the broker. The generation field ties the request
// to the broker lifecycle that triggered it, so the consumer can
Expand Down Expand Up @@ -187,8 +196,14 @@ func (r *relayInstanceManager) handleReregister(w http.ResponseWriter, req *http
}

if info.HasChanged {
r.Restart()
if err := r.Restart(); err != nil {
r.logger.Error("Unable to restart after reregistration", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Unable to restart after reregistration"))
return
}
}
w.WriteHeader(http.StatusOK)
}

func (r *relayInstanceManager) getSnykBrokerPort() int {
Expand All @@ -212,16 +227,16 @@ func (r *relayInstanceManager) handleSystemCheck(w http.ResponseWriter, req *htt
return
}
defer resp.Body.Close()
for k, v := range resp.Header {
w.Header().Set(k, strings.Join(v, ","))
}
w.WriteHeader(resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
r.logger.Error("Unable to read system check response", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
for k, v := range resp.Header {
w.Header().Set(k, strings.Join(v, ","))
}
w.WriteHeader(resp.StatusCode)
w.Write(body)

}
Expand Down Expand Up @@ -394,7 +409,7 @@ func (r *relayInstanceManager) getUrlAndToken() (*tokenInfo, error) {
}

if !tokenInfo.equals(r.tokenInfo) {
r.logger.Info("Registration info has changed", zap.String("uri", tokenInfo.ServerUri), zap.String("token", tokenInfo.Token))
r.logger.Info("Registration info has changed", zap.String("uri", tokenInfo.ServerUri), zap.String("token", maskToken(tokenInfo.Token)))
tokenInfo.HasChanged = true
r.tokenInfo = tokenInfo
}
Expand Down Expand Up @@ -598,7 +613,7 @@ func (r *relayInstanceManager) Start() error {
r.logger.Debug("Starting broker",
zap.String("executable", executable),
zap.Strings("args", args),
zap.String("token", info.Token),
zap.String("token", maskToken(info.Token)),
zap.String("uri", info.ServerUri),
zap.String("acceptFile", tmpAcceptFile),
)
Expand All @@ -622,7 +637,7 @@ func (r *relayInstanceManager) Start() error {
key := strings.TrimPrefix(parts[0], prefix)
value := parts[1]
brokerEnv[key] = value
r.logger.Debug("Adding SNYKBROKER_ environment variable", zap.String("key", key), zap.String("value", value))
r.logger.Debug("Adding SNYKBROKER_ environment variable", zap.String("key", key))
}
}

Expand Down
22 changes: 13 additions & 9 deletions agent/server/snykbroker/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/exec"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -189,20 +190,23 @@ func (b *Supervisor) runCommand() error {
// sigChan triggers shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigChan)

running := make(chan struct{})

go func() {
<-sigChan
b.stopFunc()
sig, ok := <-sigChan
if ok && sig != nil {
b.stopFunc()
}
}()

killed := false
var killed atomic.Bool

// our stopfunc allows anyone to close the process
// and wait for it to finish
b.stopFunc = func() {
killed = true
killed.Store(true)

// if process is running trigger a kill
if cmd.Process != nil {
Expand Down Expand Up @@ -244,7 +248,7 @@ func (b *Supervisor) runCommand() error {
stopStdErr()
wg.Wait()

if killed {
if killed.Load() {
err = errKilled
fmt.Printf("Process %v (pid=%v) killed\n", cmd.Path, pid)
}
Expand All @@ -266,20 +270,20 @@ func (b *Supervisor) Close() error {

func (b *Supervisor) scanLines(reader io.Reader, output chan string, refCount *sync.WaitGroup) func() {

done := false
var done atomic.Bool

refCount.Add(1)

// increase buffer size from default of 60K to 1MB
buffer := make([]byte, 1024*1024)
go func() {
for !done {
for !done.Load() {
scanner := bufio.NewScanner(reader)
scanner.Buffer(buffer, cap(buffer)-1)
for scanner.Scan() {
ln := scanner.Text()
output <- ln
if done {
if done.Load() {
return
}
}
Expand All @@ -302,7 +306,7 @@ func (b *Supervisor) scanLines(reader io.Reader, output chan string, refCount *s
}()

return func() {
done = true
done.Store(true)
refCount.Done()
}
}
Loading