diff --git a/internal/detector/nodescan.go b/internal/detector/nodescan.go index 696ce95..12b26a9 100644 --- a/internal/detector/nodescan.go +++ b/internal/detector/nodescan.go @@ -34,12 +34,23 @@ type NodeScanner struct { exec executor.Executor log *progress.Logger loggedInUser string // when non-empty and running as root, commands run as this user + // ProgressHook, when non-nil, is invoked from inside ScanProjects / + // ScanGlobalPackages with a short human-readable detail string ("project + // 12 of 47", "scanning yarn", ...). Telemetry plumbs this into + // PhaseTracker.UpdateDetail so heartbeats surface mid-phase progress. + ProgressHook func(detail string) } func NewNodeScanner(exec executor.Executor, log *progress.Logger, loggedInUser string) *NodeScanner { return &NodeScanner{exec: exec, log: log, loggedInUser: loggedInUser} } +func (s *NodeScanner) emitProgress(detail string) { + if s.ProgressHook != nil { + s.ProgressHook(detail) + } +} + // shouldRunAsUser returns true when commands should be delegated to the logged-in user. // Only applies on Unix — RunAsUser uses sudo which is not available on Windows. func (s *NodeScanner) shouldRunAsUser() bool { @@ -107,16 +118,19 @@ func (s *NodeScanner) checkPath(ctx context.Context, name string) error { func (s *NodeScanner) ScanGlobalPackages(ctx context.Context) []model.NodeScanResult { var results []model.NodeScanResult + s.emitProgress("global: npm") s.log.Progress(" Checking npm global packages...") if r, ok := s.scanNPMGlobal(ctx); ok { results = append(results, r) } + s.emitProgress("global: yarn") s.log.Progress(" Checking yarn global packages...") if r, ok := s.scanYarnGlobal(ctx); ok { results = append(results, r) } + s.emitProgress("global: pnpm") s.log.Progress(" Checking pnpm global packages...") if r, ok := s.scanPnpmGlobal(ctx); ok { results = append(results, r) @@ -354,6 +368,10 @@ func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string) []m var results []model.NodeScanResult totalSize := int64(0) + totalProjects := len(projects) + if totalProjects > maxNodeProjects { + totalProjects = maxNodeProjects + } for i, p := range projects { if i >= maxNodeProjects { s.log.Progress(" Reached maximum of %d projects, stopping search", maxNodeProjects) @@ -366,6 +384,11 @@ func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string) []m break } + // Per-project sub-progress for the heartbeat goroutine. Surfaces + // to console as "current_phase_detail: project 12 of 47" so a + // stuck scan is visibly so, not just opaque "node_scan in progress". + s.emitProgress(fmt.Sprintf("project %d of %d", i+1, totalProjects)) + s.log.Progress(" Found project: %s", p.dir) pm := DetectProjectPM(s.exec, p.dir) s.log.Progress(" Package manager: %s", pm) diff --git a/internal/detector/pythonscan.go b/internal/detector/pythonscan.go index 5283035..202f4e7 100644 --- a/internal/detector/pythonscan.go +++ b/internal/detector/pythonscan.go @@ -15,12 +15,23 @@ import ( type PythonScanner struct { exec executor.Executor log *progress.Logger + // ProgressHook, when non-nil, is invoked from inside ScanGlobalPackages + // with a short human-readable detail string ("scanning pip3", ...). + // Telemetry plumbs this into PhaseTracker.UpdateDetail so heartbeats + // surface mid-phase progress. + ProgressHook func(detail string) } func NewPythonScanner(exec executor.Executor, log *progress.Logger) *PythonScanner { return &PythonScanner{exec: exec, log: log} } +func (s *PythonScanner) emitProgress(detail string) { + if s.ProgressHook != nil { + s.ProgressHook(detail) + } +} + type pythonScanSpec struct { binary string name string @@ -49,6 +60,7 @@ func (s *PythonScanner) ScanGlobalPackages(ctx context.Context) []model.PythonSc continue } + s.emitProgress("scanning " + spec.name) s.log.Progress(" Checking %s global packages...", spec.name) version := s.getVersion(ctx, spec.binary, spec.versionCmd) diff --git a/internal/launchd/launchd.go b/internal/launchd/launchd.go index 5f8bdcd..eb2e14b 100644 --- a/internal/launchd/launchd.go +++ b/internal/launchd/launchd.go @@ -20,8 +20,22 @@ const ( systemLogDir = "/var/log/stepsecurity" ) +// DaemonPlistPath is the system-wide launchd plist installed when the agent +// runs as root. Exported so other packages (notably telemetry's invocation +// detector) can check for an installed footprint without re-deriving the path. +const DaemonPlistPath = daemonPlistPath + +// UserPlistPath returns the per-user launchd plist path installed when the +// agent runs without root. Empty when the home directory cannot be resolved. +func UserPlistPath() string { + return agentPlistPath() +} + func agentPlistPath() string { homeDir, _ := os.UserHomeDir() + if homeDir == "" { + return "" + } return homeDir + "/Library/LaunchAgents/com.stepsecurity.agent.plist" } diff --git a/internal/schtasks/schtasks.go b/internal/schtasks/schtasks.go index 7025ead..f1eb68c 100644 --- a/internal/schtasks/schtasks.go +++ b/internal/schtasks/schtasks.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + osexec "os/exec" "strconv" "github.com/step-security/dev-machine-guard/internal/config" @@ -14,6 +15,17 @@ import ( const taskName = "StepSecurity Dev Machine Guard" +// IsTaskRegistered reports whether the Windows scheduled task created by +// `dev-machine-guard install` is currently registered. Used by the +// telemetry package's invocation detector to distinguish a manual CLI run +// from a scheduler-triggered one. Any error or non-zero schtasks exit is +// treated as "not registered" so a transient Schedule-service hiccup +// degrades to "one_time" rather than erroring the run. +func IsTaskRegistered() bool { + cmd := osexec.Command("schtasks", "/query", "/tn", taskName) + return cmd.Run() == nil +} + // Install configures Windows Task Scheduler for periodic scanning. // If already installed, upgrades by removing and re-creating the task. func Install(exec executor.Executor, log *progress.Logger) error { diff --git a/internal/systemd/systemd.go b/internal/systemd/systemd.go index 8d61b92..24a34a3 100644 --- a/internal/systemd/systemd.go +++ b/internal/systemd/systemd.go @@ -17,6 +17,18 @@ import ( const unitName = "stepsecurity-dev-machine-guard" +// TimerUnitPath returns the per-user systemd timer unit path installed for +// periodic scanning. Exported so the telemetry package's invocation detector +// can stat for an installed footprint without re-deriving the path. Returns +// empty when the home directory cannot be resolved. +func TimerUnitPath() string { + homeDir, _ := os.UserHomeDir() + if homeDir == "" { + return "" + } + return filepath.Join(homeDir, ".config", "systemd", "user", unitName+".timer") +} + // Install configures a systemd user timer for periodic scanning. // If already installed, upgrades by removing and re-creating the units. func Install(exec executor.Executor, log *progress.Logger) error { diff --git a/internal/telemetry/heartbeat_shutdown_test.go b/internal/telemetry/heartbeat_shutdown_test.go new file mode 100644 index 0000000..5874529 --- /dev/null +++ b/internal/telemetry/heartbeat_shutdown_test.go @@ -0,0 +1,49 @@ +package telemetry + +import ( + "context" + "testing" + "time" +) + +// TestHeartbeatShutdown_NoDeadlock mirrors the cancel-then-wait pattern +// Run() uses to shut down its heartbeat goroutine. Two `defer` statements +// (cancel + wait) would deadlock under LIFO ordering — wait runs first, +// blocks on the goroutine, and cancel never fires. Combining them into a +// single deferred function is the fix; this test pins it down. +func TestHeartbeatShutdown_NoDeadlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + go func() { + defer close(done) + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // no-op, mimics postPhase + } + } + }() + + // This is the load-bearing pattern from Run(): cancel first, THEN wait. + // If a future refactor splits these into separate `defer` statements at + // the top level of Run(), the LIFO ordering will deadlock. + shutdownStart := time.Now() + func() { + defer func() { + cancel() + <-done + }() + }() + elapsed := time.Since(shutdownStart) + + // 50ms ticker + small scheduler overhead; 1s is generous and signals a + // hang clearly if the pattern ever regresses. + if elapsed > time.Second { + t.Fatalf("heartbeat shutdown took %s — likely deadlocked on defer ordering", elapsed) + } +} diff --git a/internal/telemetry/invocation.go b/internal/telemetry/invocation.go new file mode 100644 index 0000000..aef7f42 --- /dev/null +++ b/internal/telemetry/invocation.go @@ -0,0 +1,54 @@ +package telemetry + +import ( + "os" + "runtime" + + "github.com/step-security/dev-machine-guard/internal/launchd" + "github.com/step-security/dev-machine-guard/internal/schtasks" + "github.com/step-security/dev-machine-guard/internal/systemd" +) + +// Wire-format values for the invocation_method field. Kept stable — +// console and backend match on these literal strings. +const ( + InvocationInstall = "install" + InvocationOneTime = "one_time" +) + +// DetectInvocationMethod returns "install" when the dev-machine-guard +// scheduler footprint is present on this machine, else "one_time". +// +// The check is best-effort and never returns an error: a stat failure or a +// flaky schtasks call degrades to "one_time" so an unknown environment is +// never misreported as an installed agent. Detection is filesystem-based on +// darwin/linux and a single schtasks query on windows, so an agent rolled +// out before this code shipped starts reporting "install" on its next +// scheduled fire without any installer changes. +func DetectInvocationMethod() string { + if isSchedulerInstalled() { + return InvocationInstall + } + return InvocationOneTime +} + +func isSchedulerInstalled() bool { + switch runtime.GOOS { + case "darwin": + return fileExists(launchd.DaemonPlistPath) || fileExists(launchd.UserPlistPath()) + case "linux": + return fileExists(systemd.TimerUnitPath()) + case "windows": + return schtasks.IsTaskRegistered() + default: + return false + } +} + +func fileExists(path string) bool { + if path == "" { + return false + } + info, err := os.Stat(path) + return err == nil && !info.IsDir() +} diff --git a/internal/telemetry/invocation_test.go b/internal/telemetry/invocation_test.go new file mode 100644 index 0000000..31548f3 --- /dev/null +++ b/internal/telemetry/invocation_test.go @@ -0,0 +1,120 @@ +package telemetry + +import ( + "os" + "path/filepath" + "runtime" + "strings" + "testing" + + "github.com/step-security/dev-machine-guard/internal/launchd" + "github.com/step-security/dev-machine-guard/internal/systemd" +) + +func TestFileExists(t *testing.T) { + dir := t.TempDir() + present := filepath.Join(dir, "marker") + if err := os.WriteFile(present, []byte("x"), 0o600); err != nil { + t.Fatal(err) + } + + cases := []struct { + name string + path string + want bool + }{ + {"existing file", present, true}, + {"missing file", filepath.Join(dir, "nope"), false}, + {"empty path", "", false}, + {"directory", dir, false}, // dirs intentionally don't count as installs + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := fileExists(tc.path); got != tc.want { + t.Fatalf("fileExists(%q) = %v, want %v", tc.path, got, tc.want) + } + }) + } +} + +// TestDetectInvocationMethod_HostMachine exercises the detector against the +// real machine. The result is whatever the current dev box reports; we can +// only assert the value is one of the two valid wire-format strings. +func TestDetectInvocationMethod_HostMachine(t *testing.T) { + got := DetectInvocationMethod() + if got != InvocationInstall && got != InvocationOneTime { + t.Fatalf("DetectInvocationMethod returned %q, want %q or %q", + got, InvocationInstall, InvocationOneTime) + } +} + +// TestDetectInvocationMethod_RespondsToFilesystem covers the darwin/linux +// path that stats a scheduler artifact. On Windows the check shells out to +// schtasks, which we can't safely stub without an executor seam — skip there. +// +// Sandboxes HOME (Unix) and USERPROFILE (Windows-safe no-op on Unix) under +// t.TempDir() so launchd.UserPlistPath / systemd.TimerUnitPath compute paths +// that live entirely inside the temp tree. Without this the test would write +// markers (and MkdirAll-created parent dirs) into the developer's real +// ~/Library/LaunchAgents or ~/.config/systemd/user — leaving stray files +// behind on CI and risking a tiny TOCTOU window against a real install. +func TestDetectInvocationMethod_RespondsToFilesystem(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("windows uses schtasks /query, not filesystem") + } + + tempHome := t.TempDir() + t.Setenv("HOME", tempHome) + t.Setenv("USERPROFILE", tempHome) // no-op on Unix but cheap and keeps the seam consistent + + // Resolve the platform's expected artifact path AFTER the env override + // so os.UserHomeDir() returns tempHome. + var path string + switch runtime.GOOS { + case "darwin": + path = launchd.UserPlistPath() + case "linux": + path = systemd.TimerUnitPath() + default: + t.Skipf("no scheduler artifact path on %s", runtime.GOOS) + } + if path == "" { + t.Skip("could not resolve scheduler artifact path on this host") + } + if !strings.HasPrefix(path, tempHome) { + t.Fatalf("resolved path %q escaped tempHome %q — env sandbox is not effective", path, tempHome) + } + + // Fresh temp home — detector starts at one_time, flips to install when + // the marker appears, flips back when it's removed. + if got := DetectInvocationMethod(); got != InvocationOneTime { + t.Fatalf("on clean temp home, detector returned %q, want %q", + got, InvocationOneTime) + } + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatalf("prepare scheduler artifact dir: %v", err) + } + if err := os.WriteFile(path, []byte("x"), 0o600); err != nil { + t.Fatalf("write fake scheduler artifact: %v", err) + } + // No explicit cleanup: everything lives under t.TempDir() and is + // removed by the testing framework when the test ends. + + if got := DetectInvocationMethod(); got != InvocationInstall { + t.Fatalf("after creating %q, detector returned %q, want %q", + path, got, InvocationInstall) + } + + // Remove the marker mid-test and re-check — confirms detection is not + // cached and reflects current filesystem state. + if err := os.Remove(path); err != nil { + t.Fatalf("remove fake artifact: %v", err) + } + + if got := DetectInvocationMethod(); got != InvocationOneTime { + t.Fatalf("after removing %q, detector returned %q, want %q", + path, got, InvocationOneTime) + } +} diff --git a/internal/telemetry/phase_tracker.go b/internal/telemetry/phase_tracker.go new file mode 100644 index 0000000..aaf418d --- /dev/null +++ b/internal/telemetry/phase_tracker.go @@ -0,0 +1,136 @@ +package telemetry + +import ( + "sync" + "time" +) + +// PhaseCompletion records a single analysis phase that ran to completion. +// The list of these forms phases_completed in the run-status payload. +type PhaseCompletion struct { + Name string `json:"name"` + FinishedAt int64 `json:"finished_at"` + DurationMs int64 `json:"duration_ms"` +} + +// RunStatusInfo is the structured progress snapshot sent on each phase +// boundary and on every heartbeat tick. The same struct is embedded on the +// final telemetry Payload so a stored telemetry record is self-describing +// without joining to the run-status table. +// +// In-flight sub-progress (set via PhaseTracker.UpdateDetail) is folded into +// CurrentPhase as " ()" rather than carried in a separate +// field, so older backends that don't know about phase-detail still surface +// the string verbatim. Completed phases keep their base name only. +type RunStatusInfo struct { + PhasesCompleted []PhaseCompletion `json:"phases_completed,omitempty"` + CurrentPhase string `json:"current_phase,omitempty"` + ElapsedMs int64 `json:"elapsed_ms"` +} + +// PhaseTracker accumulates phase lifecycle events for a single telemetry +// run. The heartbeat goroutine and the main scan goroutine both touch it +// concurrently — Snapshot returns a defensive copy so the caller never +// observes a torn slice while a phase is appended. +type PhaseTracker struct { + mu sync.Mutex + startedAt time.Time + phaseStartedAt time.Time + currentPhase string + currentPhaseDetail string + completed []PhaseCompletion + now func() time.Time // overridable for tests +} + +// NewPhaseTracker constructs a tracker anchored at the current time. +func NewPhaseTracker() *PhaseTracker { + return newPhaseTrackerWithClock(time.Now) +} + +func newPhaseTrackerWithClock(now func() time.Time) *PhaseTracker { + return &PhaseTracker{ + startedAt: now(), + now: now, + } +} + +// Start records the beginning of a new phase. Calling Start while another +// phase is already in flight implicitly finishes the previous one — this +// keeps call sites tidy when phases run back-to-back without a Finish in +// between. Detail from the previous phase is cleared so it never leaks +// into the new one. +func (t *PhaseTracker) Start(phase string) { + t.mu.Lock() + defer t.mu.Unlock() + + if t.currentPhase != "" { + t.finishLocked() + } + t.currentPhase = phase + t.currentPhaseDetail = "" + t.phaseStartedAt = t.now() +} + +// Finish records completion of the current phase. No-op when nothing is +// in flight — safe to defer. +func (t *PhaseTracker) Finish() { + t.mu.Lock() + defer t.mu.Unlock() + t.finishLocked() +} + +func (t *PhaseTracker) finishLocked() { + if t.currentPhase == "" { + return + } + finishedAt := t.now() + t.completed = append(t.completed, PhaseCompletion{ + Name: t.currentPhase, + FinishedAt: finishedAt.Unix(), + DurationMs: finishedAt.Sub(t.phaseStartedAt).Milliseconds(), + }) + t.currentPhase = "" + t.currentPhaseDetail = "" +} + +// UpdateDetail sets a free-form sub-progress string for the current +// phase ("project 12 of 47", "scanning pip3", ...). No-op when no phase +// is in flight — keeps call sites tidy when a scanner reports progress +// from inside a goroutine that may outlive its enclosing Start/Finish. +func (t *PhaseTracker) UpdateDetail(detail string) { + t.mu.Lock() + defer t.mu.Unlock() + if t.currentPhase == "" { + return + } + t.currentPhaseDetail = detail +} + +// Snapshot returns a copy of the tracker state safe for marshalling on +// another goroutine. The returned slice is independent of the tracker's +// internal buffer. +// +// When the in-flight phase has a detail set, it's folded into CurrentPhase +// as " ()" so the wire format stays flat — older backends +// without a dedicated detail field still render the progress verbatim. +// PhasesCompleted entries keep their base name; detail is per-tick state, +// not a permanent label. +func (t *PhaseTracker) Snapshot() RunStatusInfo { + t.mu.Lock() + defer t.mu.Unlock() + + current := t.currentPhase + if current != "" && t.currentPhaseDetail != "" { + current = current + " (" + t.currentPhaseDetail + ")" + } + + out := RunStatusInfo{ + CurrentPhase: current, + ElapsedMs: t.now().Sub(t.startedAt).Milliseconds(), + } + if len(t.completed) > 0 { + out.PhasesCompleted = make([]PhaseCompletion, len(t.completed)) + copy(out.PhasesCompleted, t.completed) + } + return out +} diff --git a/internal/telemetry/phase_tracker_test.go b/internal/telemetry/phase_tracker_test.go new file mode 100644 index 0000000..318a312 --- /dev/null +++ b/internal/telemetry/phase_tracker_test.go @@ -0,0 +1,210 @@ +package telemetry + +import ( + "sync" + "testing" + "time" +) + +// fakeClock returns deterministic, monotonically advancing timestamps so +// duration math in the tracker is checkable without sleeping. +type fakeClock struct { + mu sync.Mutex + cur time.Time + step time.Duration +} + +func (f *fakeClock) now() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + t := f.cur + f.cur = f.cur.Add(f.step) + return t +} + +func TestPhaseTracker_StartFinishRecordsCompletion(t *testing.T) { + clk := &fakeClock{cur: time.Unix(1_700_000_000, 0), step: time.Second} + pt := newPhaseTrackerWithClock(clk.now) + + pt.Start("device_info") // t=0 + pt.Finish() // t=1 + + snap := pt.Snapshot() // t=2 — advances clock for elapsed_ms + if len(snap.PhasesCompleted) != 1 { + t.Fatalf("phases_completed = %d, want 1", len(snap.PhasesCompleted)) + } + got := snap.PhasesCompleted[0] + if got.Name != "device_info" { + t.Errorf("name = %q, want device_info", got.Name) + } + if got.DurationMs != 1000 { + t.Errorf("duration_ms = %d, want 1000", got.DurationMs) + } + if snap.CurrentPhase != "" { + t.Errorf("current_phase = %q, want empty after Finish", snap.CurrentPhase) + } +} + +func TestPhaseTracker_StartImplicitlyFinishesPrevious(t *testing.T) { + clk := &fakeClock{cur: time.Unix(1_700_000_000, 0), step: time.Second} + pt := newPhaseTrackerWithClock(clk.now) + + pt.Start("ide_scan") // t=0 + pt.Start("extension_scan") // t=1 — implicit finish of ide_scan + pt.Finish() // t=2 + + snap := pt.Snapshot() + if len(snap.PhasesCompleted) != 2 { + t.Fatalf("phases_completed = %d, want 2", len(snap.PhasesCompleted)) + } + if snap.PhasesCompleted[0].Name != "ide_scan" || snap.PhasesCompleted[1].Name != "extension_scan" { + t.Errorf("unexpected order: %+v", snap.PhasesCompleted) + } +} + +func TestPhaseTracker_SnapshotIncludesInFlightPhase(t *testing.T) { + clk := &fakeClock{cur: time.Unix(1_700_000_000, 0), step: time.Second} + pt := newPhaseTrackerWithClock(clk.now) + + pt.Start("brew_scan") // t=0 + + snap := pt.Snapshot() // t=1 — snapshot mid-phase + if snap.CurrentPhase != "brew_scan" { + t.Errorf("current_phase = %q, want brew_scan", snap.CurrentPhase) + } + if len(snap.PhasesCompleted) != 0 { + t.Errorf("phases_completed = %d, want 0 while phase still running", len(snap.PhasesCompleted)) + } + if snap.ElapsedMs < 1000 { + t.Errorf("elapsed_ms = %d, want ≥ 1000", snap.ElapsedMs) + } +} + +func TestPhaseTracker_FinishNoOpWithoutStart(t *testing.T) { + pt := NewPhaseTracker() + pt.Finish() // no panic + if got := pt.Snapshot(); len(got.PhasesCompleted) != 0 || got.CurrentPhase != "" { + t.Errorf("after Finish-without-Start: %+v, want zero state", got) + } +} + +func TestPhaseTracker_SnapshotIsDefensiveCopy(t *testing.T) { + clk := &fakeClock{cur: time.Unix(1_700_000_000, 0), step: time.Second} + pt := newPhaseTrackerWithClock(clk.now) + + pt.Start("a") + pt.Finish() + snap := pt.Snapshot() + + // Mutating the snapshot must not affect the tracker's internal state. + if len(snap.PhasesCompleted) > 0 { + snap.PhasesCompleted[0].Name = "tampered" + } + + again := pt.Snapshot() + if again.PhasesCompleted[0].Name != "a" { + t.Errorf("internal phase name was tampered via snapshot: got %q", + again.PhasesCompleted[0].Name) + } +} + +func TestPhaseTracker_UpdateDetailFoldsIntoCurrentPhase(t *testing.T) { + pt := NewPhaseTracker() + + // Detail before Start is a no-op — keeps callers from leaking stale + // per-phase strings into the next phase. + pt.UpdateDetail("dropped") + if got := pt.Snapshot(); got.CurrentPhase != "" { + t.Errorf("detail before Start should be ignored, got %q", got.CurrentPhase) + } + + pt.Start("brew_scan") + if got := pt.Snapshot(); got.CurrentPhase != "brew_scan" { + t.Errorf("no detail yet: current_phase = %q, want %q", got.CurrentPhase, "brew_scan") + } + + pt.UpdateDetail("fetching formulae") + if got := pt.Snapshot(); got.CurrentPhase != "brew_scan (fetching formulae)" { + t.Errorf("with detail: current_phase = %q, want %q", + got.CurrentPhase, "brew_scan (fetching formulae)") + } + + pt.UpdateDetail("fetching casks") + if got := pt.Snapshot(); got.CurrentPhase != "brew_scan (fetching casks)" { + t.Errorf("detail should overwrite, got %q", got.CurrentPhase) + } +} + +func TestPhaseTracker_StartClearsPreviousDetail(t *testing.T) { + pt := NewPhaseTracker() + + pt.Start("a") + pt.UpdateDetail("a-detail") + pt.Start("b") // implicit finish of a; detail must reset + if got := pt.Snapshot(); got.CurrentPhase != "b" { + t.Errorf("detail should reset on next phase, got current_phase = %q", got.CurrentPhase) + } +} + +func TestPhaseTracker_FinishClearsDetail(t *testing.T) { + pt := NewPhaseTracker() + + pt.Start("a") + pt.UpdateDetail("in-flight") + pt.Finish() + if got := pt.Snapshot(); got.CurrentPhase != "" { + t.Errorf("detail should clear on Finish, got current_phase = %q", got.CurrentPhase) + } +} + +// Completed phases keep their base name only — detail is per-tick state, +// not a permanent label baked into history. +func TestPhaseTracker_CompletedPhasesKeepBaseName(t *testing.T) { + pt := NewPhaseTracker() + + pt.Start("node_scan") + pt.UpdateDetail("project 5 of 10") + pt.Finish() + + snap := pt.Snapshot() + if len(snap.PhasesCompleted) != 1 { + t.Fatalf("phases_completed = %d, want 1", len(snap.PhasesCompleted)) + } + if snap.PhasesCompleted[0].Name != "node_scan" { + t.Errorf("completed name = %q, want bare %q without detail", + snap.PhasesCompleted[0].Name, "node_scan") + } +} + +func TestPhaseTracker_ConcurrentReadDuringWrite(t *testing.T) { + // Race detector must report clean. Spawn a writer that flips phases and + // a reader (mimicking the heartbeat goroutine) that snapshots repeatedly. + pt := NewPhaseTracker() + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for range 1000 { + pt.Start("phase") + pt.Finish() + } + close(stop) + }() + + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = pt.Snapshot() + } + } + }() + + wg.Wait() +} diff --git a/internal/telemetry/progress_hook_test.go b/internal/telemetry/progress_hook_test.go new file mode 100644 index 0000000..3ced453 --- /dev/null +++ b/internal/telemetry/progress_hook_test.go @@ -0,0 +1,33 @@ +package telemetry + +import ( + "testing" +) + +// Verifies the wiring contract telemetry.Run() relies on: a ProgressHook +// closure that calls tracker.UpdateDetail surfaces the detail through the +// next Snapshot's CurrentPhase field (folded as " ()"). +// Keeps the scanner / tracker integration honest even when the real +// scanner tests live in package detector. +func TestProgressHook_PlumbsDetailIntoSnapshot(t *testing.T) { + tracker := NewPhaseTracker() + tracker.Start("node_scan") + + // Simulate the closure telemetry.Run() installs on NodeScanner.ProgressHook. + hook := func(detail string) { tracker.UpdateDetail(detail) } + + hook("project 1 of 47") + if got := tracker.Snapshot().CurrentPhase; got != "node_scan (project 1 of 47)" { + t.Fatalf("after hook: current_phase = %q, want %q", got, "node_scan (project 1 of 47)") + } + + hook("project 47 of 47") + if got := tracker.Snapshot().CurrentPhase; got != "node_scan (project 47 of 47)" { + t.Fatalf("after second hook call: current_phase = %q", got) + } + + tracker.Finish() + if got := tracker.Snapshot().CurrentPhase; got != "" { + t.Fatalf("after Finish: current_phase = %q, want empty", got) + } +} diff --git a/internal/telemetry/run_status.go b/internal/telemetry/run_status.go index dce7962..f21b792 100644 --- a/internal/telemetry/run_status.go +++ b/internal/telemetry/run_status.go @@ -25,19 +25,43 @@ const ( // visibility — we retry harder so a single transient network blip // does not lose the signal that the run was attempted. "failed" // fires during shutdown, so one retry covers the common case. - runStatusStartedAttempts = 3 - runStatusFailedAttempts = 2 - runStatusRetryBackoff = 500 * time.Millisecond + runStatusStartedAttempts = 3 + runStatusFailedAttempts = 2 + runStatusProgressAttempts = 2 + runStatusRetryBackoff = 500 * time.Millisecond + + // runStatusHeartbeatInterval is how often the telemetry run posts a + // status_info snapshot while a scan is in flight. Phase-boundary posts + // fire on top of this so a fast run still surfaces phase completions + // without waiting for the next tick. + runStatusHeartbeatInterval = 5 * time.Minute ) +// runStatusBody is the JSON shape posted to /telemetry/run-status. Fields +// marked omitempty are unset for terminal posts; status_info is only sent +// on progress updates (status == "started" with phase data attached). +type runStatusBody struct { + ExecutionID string `json:"execution_id"` + DeviceID string `json:"device_id"` + Status string `json:"status"` + AgentVersion string `json:"agent_version"` + Platform string `json:"platform"` + InvocationMethod string `json:"invocation_method,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + StatusInfo *RunStatusInfo `json:"status_info,omitempty"` +} + // reportRunStatus POSTs a lifecycle transition to the backend with a small // retry budget. Never returns an error: running the scan is the priority. // // status must be "started" or "failed". Passing "succeeded" (or any other // value) is a defensive no-op — success is written by the backend worker -// after it persists the uploaded telemetry. +// after it persists the uploaded telemetry. invocationMethod identifies +// whether this run was triggered by an installed scheduler or a manual CLI +// invocation; an empty string is tolerated (the backend treats it as +// "unknown") so this stays callable from contexts that haven't detected it. func reportRunStatus(ctx context.Context, log *progress.Logger, - executionID, deviceID, status, errMsg string) { + executionID, deviceID, status, errMsg, invocationMethod string) { if !config.IsEnterpriseMode() { return @@ -49,12 +73,13 @@ func reportRunStatus(ctx context.Context, log *progress.Logger, return } - payload := map[string]string{ - "execution_id": executionID, - "device_id": deviceID, - "status": status, - "agent_version": buildinfo.Version, - "platform": runtime.GOOS, + body := runStatusBody{ + ExecutionID: executionID, + DeviceID: deviceID, + Status: status, + AgentVersion: buildinfo.Version, + Platform: runtime.GOOS, + InvocationMethod: invocationMethod, } if status == runStatusFailed { if errMsg == "" { @@ -64,10 +89,48 @@ func reportRunStatus(ctx context.Context, log *progress.Logger, if len(errMsg) > runStatusMaxErrorChars { errMsg = errMsg[:runStatusMaxErrorChars] } - payload["error_message"] = errMsg + body.ErrorMessage = errMsg + } + + attempts := runStatusFailedAttempts + if status == runStatusStarted { + attempts = runStatusStartedAttempts + } + postRunStatusWithRetry(ctx, log, body, attempts) +} + +// postProgress sends an idempotent in-flight progress snapshot. The backend +// treats this as status=started with status_info populated — it upserts the +// progress fields without touching the row's terminal state. Best-effort: a +// dropped heartbeat is recovered by the next tick five minutes later, so we +// keep the retry budget low (matching "failed") and never block the scan. +func postProgress(ctx context.Context, log *progress.Logger, + executionID, deviceID, invocationMethod string, info RunStatusInfo) { + + if !config.IsEnterpriseMode() { + return + } + if executionID == "" { + return + } + + infoCopy := info // RunStatusInfo is a struct of slice + scalars; copy is cheap + body := runStatusBody{ + ExecutionID: executionID, + DeviceID: deviceID, + Status: runStatusStarted, // backend distinguishes progress vs terminal by presence of status_info + AgentVersion: buildinfo.Version, + Platform: runtime.GOOS, + InvocationMethod: invocationMethod, + StatusInfo: &infoCopy, } + postRunStatusWithRetry(ctx, log, body, runStatusProgressAttempts) +} - body, err := json.Marshal(payload) +func postRunStatusWithRetry(ctx context.Context, log *progress.Logger, + body runStatusBody, attempts int) { + + encoded, err := json.Marshal(body) if err != nil { log.Progress("run-status: marshal error: %v", err) return @@ -76,11 +139,6 @@ func reportRunStatus(ctx context.Context, log *progress.Logger, endpoint := fmt.Sprintf("%s/v1/%s/developer-mdm-agent/telemetry/run-status", config.APIEndpoint, config.CustomerID) - attempts := runStatusFailedAttempts - if status == runStatusStarted { - attempts = runStatusStartedAttempts - } - for i := 1; i <= attempts; i++ { if i > 1 { // Fixed short backoff. Keeps the total time budget bounded so @@ -88,11 +146,16 @@ func reportRunStatus(ctx context.Context, log *progress.Logger, select { case <-time.After(runStatusRetryBackoff): case <-ctx.Done(): - log.Progress("run-status: parent context done, abandoning retries") + // Demoted to Debug: parent ctx done means clean shutdown + // (or operator-initiated cancel that already logged its + // own reason). No need to add a second progress-level + // noise line in the normal "scan finished successfully, + // last progress post got cut off by cancelRun()" flow. + log.Debug("run-status: parent context done, abandoning retries") return } } - if postRunStatusOnce(ctx, log, endpoint, body, status, i, attempts) { + if postRunStatusOnce(ctx, log, endpoint, encoded, body.Status, i, attempts) { return } } @@ -101,9 +164,22 @@ func reportRunStatus(ctx context.Context, log *progress.Logger, // postRunStatusOnce performs a single HTTP attempt. Returns true on a 2xx // or 4xx (terminal — retrying a bad request will not help). Returns false // on transport errors or 5xx so the caller can retry. +// +// Treats parent-ctx cancellation as terminal-and-quiet: the only time we +// observe ctx.Err() != nil here is shutdown (cancelRun fired). Logging a +// "POST error: context canceled" at progress level on every successful +// scan — because the final phase-boundary post raced the deferred +// cancelRun — would mislead operators into thinking the run failed. func postRunStatusOnce(ctx context.Context, log *progress.Logger, endpoint string, body []byte, status string, attempt, maxAttempts int) bool { + // Short-circuit when the scan ctx is already done — no point opening a + // connection just to have it cancelled mid-handshake. + if ctx.Err() != nil { + log.Debug("run-status[%s %d/%d]: skipped (parent context done)", status, attempt, maxAttempts) + return true + } + cctx, cancel := context.WithTimeout(ctx, runStatusHTTPTimeout) defer cancel() @@ -119,6 +195,14 @@ func postRunStatusOnce(ctx context.Context, log *progress.Logger, client := &http.Client{Timeout: runStatusHTTPTimeout} resp, err := client.Do(req) if err != nil { + // Distinguish shutdown-induced cancellation from a real transport + // error. If the parent ctx is done, this was clean shutdown — log + // at debug and return terminal so the caller doesn't burn a retry + // on an already-cancelled context. + if ctx.Err() != nil { + log.Debug("run-status[%s %d/%d]: aborted at shutdown: %v", status, attempt, maxAttempts, err) + return true + } log.Progress("run-status[%s %d/%d]: POST error: %v", status, attempt, maxAttempts, err) return false } diff --git a/internal/telemetry/run_status_test.go b/internal/telemetry/run_status_test.go index 8349074..57f2407 100644 --- a/internal/telemetry/run_status_test.go +++ b/internal/telemetry/run_status_test.go @@ -37,7 +37,7 @@ func TestReportRunStatus_StartedRetriesOn5xx(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "", "") if got := atomic.LoadInt32(&calls); got != int32(runStatusStartedAttempts) { t.Fatalf("expected %d retries on 5xx, got %d", runStatusStartedAttempts, got) @@ -54,7 +54,7 @@ func TestReportRunStatus_StartedStopsAfter2xx(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "", "") if got := atomic.LoadInt32(&calls); got != 1 { t.Fatalf("expected exactly 1 call on 2xx, got %d", got) @@ -72,7 +72,7 @@ func TestReportRunStatus_DoesNotRetryOn4xx(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "", "") if got := atomic.LoadInt32(&calls); got != 1 { t.Fatalf("expected 1 call for 4xx (no retry), got %d", got) @@ -89,7 +89,7 @@ func TestReportRunStatus_FailedRetriesOn5xx(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusFailed, "boom") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusFailed, "boom", "") if got := atomic.LoadInt32(&calls); got != int32(runStatusFailedAttempts) { t.Fatalf("expected %d retries on 5xx for failed, got %d", runStatusFailedAttempts, got) @@ -107,7 +107,7 @@ func TestReportRunStatus_FailedIncludesErrorMessage(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusFailed, "context deadline exceeded") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusFailed, "context deadline exceeded", "") if gotBody["status"] != runStatusFailed { t.Errorf("status = %q, want %q", gotBody["status"], runStatusFailed) @@ -129,8 +129,8 @@ func TestReportRunStatus_SkipsSucceededAndUnknownStatus(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", "succeeded", "") - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", "cancelled", "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", "succeeded", "", "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", "cancelled", "", "") if got := atomic.LoadInt32(&calls); got != 0 { t.Fatalf("expected zero HTTP calls for non-agent statuses, got %d", got) @@ -151,7 +151,7 @@ func TestReportRunStatus_SkipsWhenNotEnterprise(t *testing.T) { defer func() { config.APIKey = savedKey }() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "") + reportRunStatus(context.Background(), log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "", "") if got := atomic.LoadInt32(&calls); got != 0 { t.Fatalf("expected zero calls when not in enterprise mode, got %d", got) @@ -167,7 +167,7 @@ func TestReportRunStatus_SkipsEmptyExecutionID(t *testing.T) { defer withEnterpriseConfig(t, srv.URL)() log := progress.NewLogger(progress.LevelInfo) - reportRunStatus(context.Background(), log, "", "dev-1", runStatusStarted, "") + reportRunStatus(context.Background(), log, "", "dev-1", runStatusStarted, "", "") if got := atomic.LoadInt32(&calls); got != 0 { t.Fatalf("expected zero calls when execution_id is empty, got %d", got) @@ -192,7 +192,7 @@ func TestReportRunStatus_AbortsRetriesOnCtxCancel(t *testing.T) { done := make(chan struct{}) start := time.Now() go func() { - reportRunStatus(ctx, log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "") + reportRunStatus(ctx, log, "11111111-2222-4333-8444-555555555555", "dev-1", runStatusStarted, "", "") close(done) }() @@ -213,3 +213,110 @@ func TestReportRunStatus_AbortsRetriesOnCtxCancel(t *testing.T) { func int64Attempts() time.Duration { return time.Duration(runStatusStartedAttempts) } + +func TestReportRunStatus_IncludesInvocationMethod(t *testing.T) { + // invocation_method must round-trip on the wire so the backend can + // distinguish installed-agent runs from manual CLI runs. + var gotBody runStatusBody + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &gotBody) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + defer withEnterpriseConfig(t, srv.URL)() + + log := progress.NewLogger(progress.LevelInfo) + reportRunStatus(context.Background(), log, + "11111111-2222-4333-8444-555555555555", "dev-1", + runStatusStarted, "", InvocationInstall) + + if gotBody.InvocationMethod != InvocationInstall { + t.Errorf("invocation_method = %q, want %q", gotBody.InvocationMethod, InvocationInstall) + } + if gotBody.StatusInfo != nil { + t.Errorf("status_info should be nil on plain started post, got %+v", gotBody.StatusInfo) + } +} + +func TestReportRunStatus_OmitsInvocationMethodWhenEmpty(t *testing.T) { + // Empty invocation_method must be omitted from the wire so older agents + // that don't detect it land identical bytes to before this change. + var raw map[string]any + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &raw) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + defer withEnterpriseConfig(t, srv.URL)() + + log := progress.NewLogger(progress.LevelInfo) + reportRunStatus(context.Background(), log, + "11111111-2222-4333-8444-555555555555", "dev-1", + runStatusStarted, "", "") + + if _, ok := raw["invocation_method"]; ok { + t.Errorf("invocation_method should be omitted when empty, got body: %+v", raw) + } +} + +func TestPostProgress_SendsStatusInfo(t *testing.T) { + var gotBody runStatusBody + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &gotBody) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + defer withEnterpriseConfig(t, srv.URL)() + + info := RunStatusInfo{ + PhasesCompleted: []PhaseCompletion{ + {Name: "device_info", FinishedAt: 1_700_000_001, DurationMs: 1000}, + {Name: "ide_scan", FinishedAt: 1_700_000_005, DurationMs: 4000}, + }, + CurrentPhase: "brew_scan", + ElapsedMs: 7000, + } + + log := progress.NewLogger(progress.LevelInfo) + postProgress(context.Background(), log, + "11111111-2222-4333-8444-555555555555", "dev-1", + InvocationInstall, info) + + if gotBody.Status != runStatusStarted { + t.Errorf("status = %q, want %q (progress posts ride on started)", gotBody.Status, runStatusStarted) + } + if gotBody.InvocationMethod != InvocationInstall { + t.Errorf("invocation_method = %q, want %q", gotBody.InvocationMethod, InvocationInstall) + } + if gotBody.StatusInfo == nil { + t.Fatal("status_info missing from progress post") + } + if gotBody.StatusInfo.CurrentPhase != "brew_scan" { + t.Errorf("current_phase = %q, want brew_scan", gotBody.StatusInfo.CurrentPhase) + } + if len(gotBody.StatusInfo.PhasesCompleted) != 2 { + t.Errorf("phases_completed = %d, want 2", len(gotBody.StatusInfo.PhasesCompleted)) + } + if gotBody.StatusInfo.ElapsedMs != 7000 { + t.Errorf("elapsed_ms = %d, want 7000", gotBody.StatusInfo.ElapsedMs) + } +} + +func TestPostProgress_SkipsEmptyExecutionID(t *testing.T) { + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + })) + defer srv.Close() + defer withEnterpriseConfig(t, srv.URL)() + + log := progress.NewLogger(progress.LevelInfo) + postProgress(context.Background(), log, "", "dev-1", InvocationInstall, RunStatusInfo{}) + + if got := atomic.LoadInt32(&calls); got != 0 { + t.Fatalf("expected zero calls when execution_id is empty, got %d", got) + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index a2449f4..3269912 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "os/signal" + "sync" "sync/atomic" "syscall" "time" @@ -45,6 +46,17 @@ type Payload struct { CollectedAt int64 `json:"collected_at"` NoUserLoggedIn bool `json:"no_user_logged_in"` + // InvocationMethod is "install" when the agent ran from an installed + // launchd/systemd/schtasks unit, "one_time" for a manual CLI run. + // Duplicated on this struct (also lives on the run-status row) so the + // stored telemetry record is self-describing for backfills. + InvocationMethod string `json:"invocation_method,omitempty"` + + // StatusInfo carries the final phase completion list and total elapsed + // time the agent saw at upload time. Snapshot of the same RunStatusInfo + // streamed via the run-status endpoint during the run. + StatusInfo *RunStatusInfo `json:"status_info,omitempty"` + IDEExtensions []model.Extension `json:"ide_extensions"` IDEInstallations []model.IDE `json:"ide_installations"` NodePkgManagers []model.PkgManager `json:"node_package_managers"` @@ -97,9 +109,26 @@ type PerformanceMetrics struct { // [scanning] Device ID (Serial): ... // ... func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err error) { - ctx := context.Background() + // cancelRun signals the heartbeat goroutine to exit. The post-goroutine + // defer below does cancel-then-wait so heartbeat shutdown is clean on + // the happy path. This top-level defer is a safety net for early + // returns (lock-acquire failure, etc.) that bail before the goroutine + // is even spawned — in that case cancelRun must still run so the ctx + // is released. Double-cancel on the normal path is a no-op. + ctx, cancelRun := context.WithCancel(context.Background()) + defer cancelRun() startTime := time.Now() + // Detect invocation method once at run start: "install" if the platform's + // scheduler footprint is on disk, else "one_time". Threaded into every + // run-status post and stamped on the final payload. + invocationMethod := DetectInvocationMethod() + + // Phase tracker accumulates per-analysis-section completions so the + // backend can surface in-flight progress on the console. Reads from the + // heartbeat goroutine are mutex-guarded inside Snapshot. + tracker := NewPhaseTracker() + // Generate a per-run execution ID up front so failures before device.Gather // can still be attributed. Fall back to a timestamp-derived ID if crypto/rand // errors (vanishingly unlikely) — reporting is best-effort and must never @@ -121,8 +150,80 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var reportedFailed atomic.Bool reportFailedOnce := func(errMsg string) { if reportedFailed.CompareAndSwap(false, true) { - reportRunStatus(context.Background(), log, executionID, deviceID, runStatusFailed, errMsg) + reportRunStatus(context.Background(), log, executionID, deviceID, runStatusFailed, errMsg, invocationMethod) + } + } + + // Phase-boundary progress posts run on a dedicated worker so the scan + // never blocks on HTTP at a call site. Buffer=1 + drop-oldest send + // gives us two properties together: + // - Strict ordering: a single consumer means the backend can never + // see an older snapshot land after a newer one (which would cause + // the console UI to briefly regress on degraded networks). + // - Bounded resources: at most one pending snapshot is queued; a + // slow-network backlog can't grow across the 11+ inline call + // sites. The latest snapshot always wins, which matches what an + // operator watching progress actually cares about. + // + // Without this, blocking phase posts could add the per-call retry + // budget (~6s: 2 attempts × 3s HTTP timeout + 500ms backoff) to each + // call site, compounding to over a minute of added scan latency on a + // degraded link for purely best-effort progress data. + phaseCh := make(chan RunStatusInfo, 1) + phaseDone := make(chan struct{}) + var phaseSendMu sync.Mutex // serialises drain+send so concurrent producers (main scan + heartbeat) don't race + go func() { + defer close(phaseDone) + // process posts one snapshot using a Background-derived ctx with + // a bounded per-post timeout. We deliberately do NOT chain off the + // scan ctx here: the final phase-boundary post (which is the only + // snapshot that includes "telemetry_upload" in phases_completed) + // arrives at the worker *after* the function body returns and the + // deferred cancelRun() fires. If we shared the scan ctx, that post + // would always be cancelled mid-flight and the backend would never + // learn the upload completed. The 10s budget covers postProgress's + // own internal retry window (2×3s + 500ms backoff) with slack. + process := func(snap RunStatusInfo) { + postCtx, postCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer postCancel() + postProgress(postCtx, log, executionID, deviceID, invocationMethod, snap) + } + for { + select { + case snap := <-phaseCh: + process(snap) + case <-ctx.Done(): + // Drain any queued snapshot before exiting. Without this, + // a naïve select on the next iteration would 50/50 between + // the ready ctx.Done() and the ready phaseCh — dropping + // the final post is exactly what the user reports as + // "telemetry_upload missing from phases_completed". + for { + select { + case snap := <-phaseCh: + process(snap) + default: + return + } + } + } + } + }() + + // postPhase is the convergence point for phase-boundary and heartbeat + // progress updates. Captured here so the heartbeat goroutine and the + // inline phase wrappers share a single call site. + postPhase := func() { + snap := tracker.Snapshot() + phaseSendMu.Lock() + defer phaseSendMu.Unlock() + // Drop any queued (older) snapshot so the freshest one always lands. + select { + case <-phaseCh: + default: } + // Always succeeds: buffer=1, just drained, single sender under the mutex. + phaseCh <- snap } // Catch SIGINT / SIGTERM so cancellation (Ctrl+C, launchd stop, kill) @@ -190,7 +291,9 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err }() log.Progress("Lock acquired (PID: %d)", os.Getpid()) - // Device info + // Device info — first tracked phase. Completes before the "started" + // post so the first heartbeat already includes it in phases_completed. + tracker.Start("device_info") log.Progress("Gathering device information...") dev := device.Gather(ctx, exec) deviceID = dev.SerialNumber @@ -214,9 +317,43 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err if noUserLoggedIn { log.Warn("no real developer identity (UserIdentity=%q, root=%v) — telemetry will be marked no_user_logged_in", dev.UserIdentity, exec.IsRoot()) } + tracker.Finish() // Report "started" now that we have a device_id. Fire-and-forget. - reportRunStatus(ctx, log, executionID, deviceID, runStatusStarted, "") + reportRunStatus(ctx, log, executionID, deviceID, runStatusStarted, "", invocationMethod) + + // First progress upsert: surfaces device_info completion immediately + // without waiting for the 5-minute heartbeat. Safe to call after the + // "started" post because the backend now has a row to upsert into. + postPhase() + + // Heartbeat goroutine: pushes status_info on a ticker so a long-running + // phase (brew on a 200k-package macbook, syspkg on a fat dpkg machine) + // still surfaces progress to the console between phase boundaries. + heartbeatDone := make(chan struct{}) + go func() { + defer close(heartbeatDone) + ticker := time.NewTicker(runStatusHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + postPhase() + } + } + }() + // Shut down both the heartbeat goroutine and the phase-post worker + // cleanly on return. Order matters: cancel first so both goroutines + // see ctx.Done() and exit, THEN wait for each to close its done + // channel. Splitting these into separate `defer` statements would + // deadlock — LIFO would block on the waits before cancel fires. + defer func() { + cancelRun() + <-heartbeatDone + <-phaseDone + }() // Detect logged-in user for running commands as the real user when root. // Skip "root" — if LoggedInUser() fell back to CurrentUser(), delegating @@ -244,6 +381,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err fmt.Fprintln(os.Stderr) // Detect IDEs + tracker.Start("ide_scan") log.Progress("Detecting IDE and AI desktop app installations...") ideDetector := detector.NewIDEDetector(exec) ides := ideDetector.Detect(ctx) @@ -254,8 +392,11 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress(" No IDEs or AI desktop apps found") } fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() // Collect extensions + tracker.Start("extension_scan") log.Progress("Scanning extensions...") extDetector := detector.NewExtensionDetector(exec) extensions := extDetector.Detect(ctx, searchDirs, ides) @@ -272,8 +413,13 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err } log.Progress("Found total of %d IDE extensions", len(extensions)) fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() - // Detect AI tools + // Detect AI tools — CLI + general agents + frameworks roll up into one + // phase since they're all quick discovery passes against the same user + // home and exec PATH. + tracker.Start("ai_tools_scan") log.Progress("Detecting AI agents and tools...") fmt.Fprintln(os.Stderr) @@ -312,8 +458,11 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err fmt.Fprintln(os.Stderr) allAI := append(append(cliTools, agents...), frameworks...) + tracker.Finish() + postPhase() // MCP configs + tracker.Start("mcp_config_scan") log.Progress("Collecting MCP configuration files...") mcpDetector := detector.NewMCPDetector(exec) mcpConfigs := mcpDetector.DetectEnterprise(ctx) @@ -326,6 +475,8 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Debug("scan totals: ides=%d extensions=%d ai_cli=%d agents=%d frameworks=%d mcp_configs=%d", len(ides), len(extensions), len(cliTools), len(agents), len(frameworks), len(mcpConfigs)) fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() // Homebrew scanning brewEnabled := true @@ -338,6 +489,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var brewFormulae, brewCasks []model.BrewPackage if brewEnabled { + tracker.Start("brew_scan") log.Progress("Detecting Homebrew...") brewDetector := detector.NewBrewDetector(userExec) brewPkgMgr = brewDetector.DetectBrew(ctx) @@ -363,6 +515,8 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress(" Homebrew not found") } fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() } else { log.Progress("Homebrew scanning is DISABLED") fmt.Fprintln(os.Stderr) @@ -379,6 +533,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var pythonProjects []model.ProjectInfo if pythonEnabled { + tracker.Start("python_scan") log.Progress("Detecting Python package managers...") pyDetector := detector.NewPythonPMDetector(userExec) pythonPkgManagers = pyDetector.DetectManagers(ctx) @@ -391,6 +546,10 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress("Scanning Python global packages...") pyScanner := detector.NewPythonScanner(userExec, log) + // Stream per-PM sub-progress ("scanning pip3" / "scanning conda" / + // "scanning uv") into the phase tracker so heartbeats surface where + // inside the python phase a slow pip3 list is stuck. + pyScanner.ProgressHook = func(detail string) { tracker.UpdateDetail(detail) } pythonGlobalPkgs = pyScanner.ScanGlobalPackages(ctx) log.Progress(" Found %d Python global package source(s)", len(pythonGlobalPkgs)) @@ -399,6 +558,8 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err pythonProjects = pyProjectDetector.ListProjects(searchDirs) log.Progress(" Found %d Python projects", len(pythonProjects)) fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() } else { log.Progress("Python scanning is DISABLED") fmt.Fprintln(os.Stderr) @@ -408,6 +569,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var systemPackageScans []model.SystemPackageScanResult if exec.GOOS() == model.PlatformLinux { + tracker.Start("syspkg_scan") log.Progress("Detecting system packages...") sysPkgDetector := detector.NewSystemPkgDetector(userExec) @@ -460,6 +622,8 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress(" No system package managers found") } fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() } else { log.Progress("System package scanning: skipped (non-Linux)") fmt.Fprintln(os.Stderr) @@ -477,6 +641,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var nodeScanMs int64 if npmEnabled { + tracker.Start("node_scan") log.Progress("Node.js package scanning is ENABLED") log.Progress("Detecting Node.js package managers...") @@ -489,6 +654,10 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress("Scanning globally installed packages...") nodeScanner := detector.NewNodeScanner(exec, log, loggedInUsername) + // Stream sub-progress so heartbeats show "project 12 of 47" / + // "global: yarn" during the long-running node phase. Both + // ScanGlobalPackages and ScanProjects share this hook. + nodeScanner.ProgressHook = func(detail string) { tracker.UpdateDetail(detail) } globalPkgs = nodeScanner.ScanGlobalPackages(ctx) log.Progress(" Found %d global package location(s)", len(globalPkgs)) fmt.Fprintln(os.Stderr) @@ -500,6 +669,8 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress(" Found %d Node.js projects", len(nodeProjects)) log.Progress(" Scan duration: %dms", nodeScanMs) fmt.Fprintln(os.Stderr) + tracker.Finish() + postPhase() } else { log.Progress("Node.js package scanning is DISABLED") fmt.Fprintln(os.Stderr) @@ -547,6 +718,12 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err execLogsBase64 := capture.Finalize() endTime := time.Now() + // Snapshot the final progress state right before we serialize. By this + // point every analysis phase has been Finish()-ed so PhasesCompleted + // holds the full list and CurrentPhase is empty — the upload itself + // runs after this snapshot and is intentionally not tracked as a phase. + finalStatusInfo := tracker.Snapshot() + // Build payload payload := &Payload{ CustomerID: config.CustomerID, @@ -561,6 +738,9 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err CollectedAt: endTime.Unix(), NoUserLoggedIn: noUserLoggedIn, + InvocationMethod: invocationMethod, + StatusInfo: &finalStatusInfo, + IDEExtensions: extensions, IDEInstallations: ides, NodePkgManagers: pkgManagers, @@ -600,11 +780,18 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err }, } - // Upload to S3 + // Upload to S3 — tracked as the final phase. The Payload's StatusInfo + // above is intentionally snapshotted *before* this phase starts (the + // payload can't describe its own upload), so this phase only appears + // on the run-status row via heartbeats and the post-upload progress + // post below. + tracker.Start("telemetry_upload") log.Progress("Requesting upload URL from backend...") - if err := uploadToS3(ctx, log, payload, executionID); err != nil { + if err := uploadToS3(ctx, log, payload, executionID, tracker); err != nil { return fmt.Errorf("uploading telemetry: %w", err) } + tracker.Finish() + postPhase() fmt.Fprintln(os.Stderr) log.Progress("Telemetry collection completed successfully") @@ -637,7 +824,17 @@ func totalSystemPackagesCount(scans []model.SystemPackageScanResult) int { return total } -func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, executionID string) error { +func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, executionID string, tracker *PhaseTracker) error { + // updateDetail forwards sub-progress to the heartbeat goroutine via the + // tracker. Tolerates nil so the function stays callable from tests that + // don't supply a tracker. + updateDetail := func(detail string) { + if tracker != nil { + tracker.UpdateDetail(detail) + } + } + + updateDetail("compressing payload") payloadJSON, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshaling payload: %w", err) @@ -650,6 +847,7 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe if err != nil { return fmt.Errorf("compressing payload: %w", err) } + updateDetail("requesting upload URL") // Request upload URL reqBody, _ := json.Marshal(map[string]any{ @@ -698,6 +896,7 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe uploaded := false var lastFailure string for attempt := 1; attempt <= maxRetries; attempt++ { + updateDetail(fmt.Sprintf("uploading to S3 (attempt %d/%d, %d KiB)", attempt, maxRetries, len(compressedPayload)/1024)) uploadStart := time.Now() putReq, reqErr := http.NewRequestWithContext(ctx, http.MethodPut, urlResp.UploadURL, bytes.NewReader(compressedPayload)) if reqErr != nil { @@ -780,6 +979,7 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe } // Notify backend + updateDetail("notifying backend") log.Progress("Notifying backend of upload...") notifyBody, _ := json.Marshal(map[string]string{ "s3_key": urlResp.S3Key, diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go index 98c614b..d3223cd 100644 --- a/internal/telemetry/telemetry_test.go +++ b/internal/telemetry/telemetry_test.go @@ -98,7 +98,7 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"} const testExecutionID = "11111111-2222-4333-8444-555555555555" - if err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, testExecutionID); err != nil { + if err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, testExecutionID, nil); err != nil { t.Fatalf("uploadToS3 failed: %v", err) } @@ -198,7 +198,7 @@ func TestUploadToS3_Synthetic200ConfirmedByBackend(t *testing.T) { err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, - "11111111-2222-4333-8444-555555555555") + "11111111-2222-4333-8444-555555555555", nil) if err != nil { t.Fatalf("uploadToS3 must succeed when backend confirms uploaded=true, got: %v", err) } @@ -257,7 +257,7 @@ func TestUploadToS3_Synthetic200MissingExhaustsRetries(t *testing.T) { err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, - "11111111-2222-4333-8444-555555555555") + "11111111-2222-4333-8444-555555555555", nil) if err == nil { t.Fatal("uploadToS3 must fail when every confirm reports the object missing") } @@ -313,7 +313,7 @@ func TestUploadToS3_Synthetic200UnsupportedBackendTrustsPUT(t *testing.T) { err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, - "11111111-2222-4333-8444-555555555555") + "11111111-2222-4333-8444-555555555555", nil) if err != nil { t.Fatalf("uploadToS3 must succeed when confirm-upload is unsupported (404), got: %v", err) } @@ -363,7 +363,7 @@ func TestUploadToS3_Synthetic200IndeterminateExhausts(t *testing.T) { err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, - "11111111-2222-4333-8444-555555555555") + "11111111-2222-4333-8444-555555555555", nil) if err == nil { t.Fatal("uploadToS3 must fail when every confirm is indeterminate") } @@ -432,7 +432,7 @@ func TestUploadToS3_Synthetic200ThenRealAWSHeaders(t *testing.T) { err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, - "11111111-2222-4333-8444-555555555555") + "11111111-2222-4333-8444-555555555555", nil) if err != nil { t.Fatalf("uploadToS3 must recover when a later attempt reaches real S3, got: %v", err) }