From 98127fc7b0efe564efd6ef3705c75f839211ce18 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Thu, 2 Apr 2026 17:51:49 +1100 Subject: [PATCH] feat: add weighted fair queuing scheduler Introduces a scheduler with four layered admission constraints: - Total concurrency: hard global cap on running jobs - Priority tiers: weighted share of total concurrency per tier (Priority struct with Level + Weight) - Per-type limits: fraction of tier slots per job type - Conflict groups: mutual exclusion by group + job ID Within a tier, jobs are ordered by accumulated cost per fairness key (lowest first), with cost estimated via EMA of wall time. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/scheduler/README.md | 670 +++++++++++++++++++++++++++ internal/scheduler/metrics.go | 25 + internal/scheduler/scheduler.go | 568 +++++++++++++++++++++++ internal/scheduler/scheduler.svg | 97 ++++ internal/scheduler/scheduler_test.go | 447 ++++++++++++++++++ internal/scheduler/soak_test.go | 150 ++++++ 6 files changed, 1957 insertions(+) create mode 100644 internal/scheduler/README.md create mode 100644 internal/scheduler/metrics.go create mode 100644 internal/scheduler/scheduler.go create mode 100644 internal/scheduler/scheduler.svg create mode 100644 internal/scheduler/scheduler_test.go create mode 100644 internal/scheduler/soak_test.go diff --git a/internal/scheduler/README.md b/internal/scheduler/README.md new file mode 100644 index 0000000..8a8b14c --- /dev/null +++ b/internal/scheduler/README.md @@ -0,0 +1,670 @@ +# Scheduler Redesign: Weighted Fair Queuing with Conflict Exclusion + +## Problem Statement + +The current internal async task queue has a max concurrency limit and sub-queues bounded to 1 for synchronisation. This is insufficient for real-world usage: + +1. **Client DoS**: A single client initiated 1000 parallel clones, starving all other clients. +2. **Uniform cost assumption**: All jobs are treated equally, but `linux.git` clone is far more intensive than `git.git` clone. +3. **No foreground/background interaction**: Synchronous (foreground) work contributes to load, but the scheduler can't account for it because only async (background) jobs go through it. + +## Design Overview + +![Scheduler flow diagram](scheduler.svg) + +All work — foreground and background — goes through a single scheduler. The scheduler uses **cost-based fair queuing** to ensure no single client monopolises the system, with **conflict key exclusion** to prevent unsafe concurrent operations on the same resource. + +### Core Concepts + +**Job**: The unit of work, identified by `(job_type, job_id)`. Optionally carries a `fairness_key` for foreground work. + +**Cost**: A `time.Duration` representing the relative system impact of a job. The scheduler automatically learns the cost of each `(job_type, job_id)` pair using an exponential moving average of observed execution time. On first encounter, the `DefaultCost` from the job type config is used. Callers never specify cost explicitly. + +**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, its estimated cost is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next. + +**Fairness key**: An opaque string on the job, populated by the caller. For foreground jobs, this is typically the client IP or identity. For background jobs, this is empty. The scheduler doesn't know what it represents — it just uses it for ordering. + +**Conflict group**: A named group on the job type config. Two jobs conflict if they share a `job_id` and belong to the same non-empty conflict group. For example, `sync-clone`, `repack`, and `pull` all belong to the `"git"` conflict group — at most one of these can run on a given repo at a time. `snapshot` has no conflict group, so it runs concurrently with anything. This avoids the error-prone per-type conflict lists and ensures symmetry automatically. + +**Priority**: A struct with `Level` (dispatch ordering) and `Weight` (share of total concurrency). Priority is a property of a scheduling tier, not an individual job type — multiple job types share the same `Priority` value. + +**Total concurrency**: A global cap on the maximum number of concurrently running jobs across all tiers, configured via `Config.TotalConcurrency`. + +**Weight**: Each priority tier has a `Weight`. The scheduler divides `TotalConcurrency` among tiers proportionally: a tier's slot allocation is `Weight / sum(all tiers' weights) * TotalConcurrency`. For example, with `TotalConcurrency=50`, a foreground tier with weight 4 and a background tier with weight 1 get 40 and 10 slots respectively. + +**Max concurrency (per type)**: A fraction (0–1) of the tier's computed slot allocation that a single job type may consume. For example, `MaxConcurrency: 0.3` on a type in a tier with 10 slots means at most 3 concurrent jobs of that type. The floor is 1, so every registered type can always run at least one job. + +### Job Model + +```go +// JobType is a named string type for type safety. Constants are defined +// by the application, not the scheduler package, keeping the scheduler +// agnostic to domain concepts like git. +type JobType string +``` + +### Priority and Job Type Configuration + +```go +// Priority defines a scheduling tier with dispatch ordering and a share of +// total concurrency. Two Priority values with the same Level are the same +// tier and must have the same Weight. +type Priority struct { + Level int // higher = dispatched first + Weight float64 // weight for dividing TotalConcurrency among tiers +} + +type ConflictGroup string + +type JobTypeConfig struct { + DefaultCost time.Duration // initial cost estimate for jobs of this type + MaxConcurrency float64 // fraction (0-1) of tier's slots this type may use + ConflictGroup ConflictGroup // jobs in same group conflict on same job_id + Priority Priority // scheduling tier this job type belongs to +} +``` + +Example registrations: + +```go +// Application-level constants, not in the scheduler package +var ( + PriorityForeground = scheduler.Priority{Level: 10, Weight: 4} + PriorityBackground = scheduler.Priority{Level: 5, Weight: 1} +) + +const ( + JobTypeSyncClone scheduler.JobType = "sync-clone" + JobTypeRepack scheduler.JobType = "repack" + JobTypePull scheduler.JobType = "pull" + JobTypeSnapshot scheduler.JobType = "snapshot" + + ConflictGroupGit scheduler.ConflictGroup = "git" +) + +scheduler.RegisterType(JobTypeSyncClone, scheduler.JobTypeConfig{ + DefaultCost: 10 * time.Second, + MaxConcurrency: 1.0, + ConflictGroup: ConflictGroupGit, + Priority: PriorityForeground, +}) + +scheduler.RegisterType(JobTypeRepack, scheduler.JobTypeConfig{ + DefaultCost: 20 * time.Second, + MaxConcurrency: 0.3, + ConflictGroup: ConflictGroupGit, + Priority: PriorityBackground, +}) + +scheduler.RegisterType(JobTypePull, scheduler.JobTypeConfig{ + DefaultCost: 10 * time.Second, + MaxConcurrency: 0.3, + ConflictGroup: ConflictGroupGit, + Priority: PriorityBackground, +}) + +// No ConflictGroup — never conflicts with anything +scheduler.RegisterType(JobTypeSnapshot, scheduler.JobTypeConfig{ + DefaultCost: 5 * time.Second, + MaxConcurrency: 0.5, + Priority: PriorityBackground, +}) +``` + +With `TotalConcurrency=50`: foreground gets `4/(4+1) * 50 = 40` slots, background gets `1/(4+1) * 50 = 10` slots. Repack can use at most `0.3 * 10 = 3` of those background slots. + +### Calling Patterns + +Foreground (synchronous) — the caller blocks until the job completes: + +```go +func (s *Scheduler) RunSync( + ctx context.Context, + jobType JobType, + jobID string, + fairnessKey string, + fn func(ctx context.Context) error, +) error +``` + +```go +err := scheduler.RunSync( + ctx, + JobTypeSyncClone, + "github.com/torvalds/linux", + request.RemoteAddr, + func(ctx context.Context) error { + return cloneRepo(ctx, "github.com/torvalds/linux") + }, +) +``` + +Background (async) — returns immediately, job runs when admitted: + +```go +func (s *Scheduler) Submit( + jobType JobType, + jobID string, + fn func(ctx context.Context) error, +) +``` + +```go +scheduler.Submit( + JobTypeRepack, + "github.com/torvalds/linux", + func(ctx context.Context) error { + return repackRepo(ctx, "github.com/torvalds/linux") + }, +) +``` + +Both enter the same pending queue and the same admission logic. `RunSync` blocks on a completion signal before returning to the caller. + +## Dispatch Algorithm + +The entire scheduling algorithm: + +``` +sort pending jobs by (-priority.level, accumulated_cost[fairness_key], arrival_time) + +for each job in sorted order: + if total_running >= config.total_concurrency → skip + tier_slots = tier.weight / sum(all tier ratios) * total_concurrency + if count(running where priority.level == job.priority.level) >= tier_slots → skip + type_slots = max(1, int(type.max_concurrency * tier_slots)) + if type_running_count >= type_slots → skip + if any running job has same job_id AND same non-empty conflict_group → skip + admit job + estimated_cost = cost_estimates[(job_type, job_id)] or type.default_cost + accumulated_cost[fairness_key] += estimated_cost +``` + +When a job completes: + +``` +elapsed = wall time since job started +cost_estimates[(job_type, job_id)] = α * elapsed + (1-α) * cost_estimates[(job_type, job_id)] +re-evaluate pending queue for newly admissible jobs +``` + +Key properties of this algorithm: + +- **Total concurrency**: a hard global cap prevents the system from being overloaded regardless of tier configuration. +- **Priority**: foreground always dispatched before background. Background only runs in capacity not used by foreground. +- **Proportional tier allocation**: each priority tier gets a share of total concurrency based on its `Weight` relative to all other tiers' weights. This makes configuration scale-independent — changing `TotalConcurrency` adjusts all tiers proportionally. +- **Per-type limits**: within a tier, individual job types can be capped to a fraction of the tier's allocation, preventing expensive operations from monopolising the tier. +- **Fairness**: within a priority level, jobs from the fairness key with the lowest accumulated cost go first. A client that has consumed a lot of capacity yields to one that has consumed little. +- **Cost-awareness**: expensive jobs advance accumulated cost faster, so they naturally yield to cheaper work from other clients. A `linux.git` clone that takes 60 seconds advances the client's accumulated cost by ~60s, while a `git.git` clone that takes 5 seconds advances it by ~5s. +- **Adaptive**: the scheduler automatically learns the cost of each `(job_type, job_id)` pair. No manual cost tuning required. After one execution, estimates are already meaningful. +- **Conflict safety**: conflicting jobs on the same resource stay in the pending queue, not consuming concurrency slots while they wait. +- **No head-of-line blocking**: if the next job by ordering is blocked (conflict or concurrency limit), the scheduler skips it and admits the next admissible job. + +## Cost Estimation + +The scheduler maintains an exponential moving average of observed wall time per `(job_type, job_id)`: + +``` +estimatedCost = α * observedWallTime + (1-α) * estimatedCost +``` + +`α` is the smoothing factor (0–1) controlling how quickly estimates adapt. `α = 0.3` is a reasonable default — it converges from `DefaultCost` to the true value within a handful of runs, while remaining stable against outliers (e.g., a single slow clone due to network congestion won't drastically inflate the estimate). Should be a configurable constant. + +Wall time directly measures the resource being rationed — how long a job holds a concurrency slot. On first encounter of a `(job_type, job_id)` pair, `DefaultCost` from the job type config is used. After one execution, the estimate is based on real data. + +The estimates map needs TTL-based cleanup, same as the accumulated cost map. Estimates could optionally be persisted across restarts to avoid cold-start inaccuracy, using the existing persistence layer. + +## Accumulated Cost Lifecycle + +The accumulated cost map needs periodic cleanup since fairness keys (client IPs) are ephemeral — thousands of agentic workstations may spin up and down. Options: + +- **TTL-based eviction**: remove entries not seen for N minutes. +- **Periodic reset**: zero all counters every N minutes. +- **Advance idle keys**: when a key is seen again after being idle, advance it to the current global minimum (prevents penalising returning clients, prevents exploiting fresh counters). + +Start with TTL-based eviction and refine based on production behaviour. + +## Go Implementation Notes + +### Building Blocks + +- `golang.org/x/sync/semaphore` — *not* needed. The weighted semaphore approach was considered and rejected in favour of simple concurrency counting, which avoids starvation issues with high-cost jobs. +- `container/heap` — useful for the priority queue ordering. +- `sync.Cond` or channel — for waking the dispatch loop when a job completes. + +### Synchronisation Concern + +The previous implementation conflated synchronisation (mutex per resource) with scheduling. In this design, synchronisation is handled by conflict groups within the scheduler. There is no external mutex — the scheduler itself ensures jobs in the same conflict group don't run concurrently on the same resource by keeping them in the pending queue. + +### Persistence + +The existing persistence layer for scheduled jobs (recording last execution time to avoid thundering herd on restart) remains unchanged. It's orthogonal to the scheduling algorithm. + +### Prior Art + +The Kubernetes API Priority and Fairness system (`k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing`) solves a very similar problem for the kube-apiserver. It uses priority levels, flow distinguishers (fairness keys), shuffle sharding, and work estimation in "seats" (cost). The KEP is worth reading for context: + +https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md + +The k8s implementation is too coupled to the apiserver to use as a library, but the design concepts directly informed this approach. Our design is simpler: no shuffle sharding (explicit fairness keys instead), no dynamic reconfiguration, and we add conflict key exclusion which k8s doesn't have. + +--- + +## Appendix: Simulation Code + +The following Python simulation validates the design. It demonstrates: + +1. Background saturated, foreground arrives — starts immediately due to tier concurrency allocation +2. Two client burst + background + conflict interactions +3. Expensive vs cheap clones with cost-based fairness +4. Conflict key preventing unsafe concurrent access +5. Many background types across 4 job types, foreground still starts immediately + +```python +from dataclasses import dataclass, field +from collections import defaultdict + + +@dataclass +class Job: + job_type: str + job_id: str + fairness_key: str + cost: int + duration: int + arrival_time: int = 0 + + def __repr__(self): + fk = self.fairness_key or "bg" + return f"{self.job_type}({self.job_id}, fk={fk}, cost={self.cost}, dur={self.duration})" + + +@dataclass +class RunningJob: + job: Job + finish_time: int + + +@dataclass +class Priority: + level: int # higher = dispatched first + weight: float # weight for dividing total_concurrency among tiers + + +@dataclass +class JobTypeConfig: + max_concurrency: float = 1.0 # fraction (0-1) of tier's slots + conflict_group: str = "" + priority: Priority = field(default_factory=lambda: Priority(0, 1)) + + +class Scheduler: + def __init__(self, total_concurrency): + self.total_concurrency = total_concurrency + self.all_jobs = [] + self.pending = [] + self.running = [] + self.virtual_time = defaultdict(int) # accumulated cost per fairness key + self.type_configs = {} + self.completed = [] + self.log = [] + + def register_type(self, name, config): + self.type_configs[name] = config + + def submit(self, job): + self.all_jobs.append(job) + + def _get_priority(self, job): + config = self.type_configs.get(job.job_type) + return config.priority if config else Priority(0, 1) + + def _tier_slots(self, priority): + """Compute slots for a tier based on its ratio relative to all tiers.""" + seen = {} + for cfg in self.type_configs.values(): + if cfg.priority.level not in seen: + seen[cfg.priority.level] = cfg.priority.weight + total_ratio = sum(seen.values()) + if total_ratio == 0: + return 1 + return max(1, int(priority.weight / total_ratio * self.total_concurrency)) + + def _type_running_count(self, job_type): + return sum(1 for rj in self.running if rj.job.job_type == job_type) + + def _priority_running_count(self, level): + return sum( + 1 for rj in self.running + if self._get_priority(rj.job).level == level + ) + + def _has_conflict(self, job): + config = self.type_configs.get(job.job_type) + if not config or not config.conflict_group: + return False + for rj in self.running: + if rj.job.job_id == job.job_id: + running_config = self.type_configs.get(rj.job.job_type) + if running_config and running_config.conflict_group == config.conflict_group: + return True + return False + + def _arrive_jobs(self, current_time): + newly_arrived = [j for j in self.all_jobs if j.arrival_time == current_time] + for j in newly_arrived: + self.all_jobs.remove(j) + self.pending.append(j) + self.log.append(f" T={current_time:>3}: ARRIVE {j}") + + def _dispatch(self, current_time): + admitted_any = True + while admitted_any and len(self.running) < self.total_concurrency: + admitted_any = False + + scorable = [] + for i, job in enumerate(self.pending): + priority = self._get_priority(job) + vt = self.virtual_time[job.fairness_key] + scorable.append((-priority.level, vt, job.arrival_time, i, job)) + scorable.sort() + + for _, vt, _, idx, job in scorable: + config = self.type_configs.get(job.job_type) + priority = self._get_priority(job) + + # Check total concurrency + if len(self.running) >= self.total_concurrency: + continue + + # Check tier concurrency + tier_slots = self._tier_slots(priority) + if self._priority_running_count(priority.level) >= tier_slots: + continue + + # Check type concurrency (fraction of tier slots) + type_slots = max(1, int(config.max_concurrency * tier_slots)) + if self._type_running_count(job.job_type) >= type_slots: + continue + + # Check conflict + if self._has_conflict(job): + continue + + self.pending.remove(job) + self.running.append( + RunningJob(job=job, finish_time=current_time + job.duration) + ) + self.virtual_time[job.fairness_key] += job.cost + fk = job.fairness_key or "bg" + pri_run = self._priority_running_count(priority.level) + self.log.append( + f" T={current_time:>3}: ADMIT {job} " + f"(level={priority.level}, tier_slots={tier_slots}, " + f"VT[{fk}]={self.virtual_time[job.fairness_key]}, " + f"tier_running={pri_run}, total={len(self.running)})" + ) + admitted_any = True + break + + def _complete_jobs(self, current_time): + still_running = [] + for rj in self.running: + if rj.finish_time <= current_time: + self.completed.append((current_time, rj.job)) + self.log.append(f" T={current_time:>3}: DONE {rj.job}") + else: + still_running.append(rj) + self.running = still_running + + def run(self, until): + for t in range(until + 1): + self._arrive_jobs(t) + self._complete_jobs(t) + self._dispatch(t) + return self.log, self.completed, dict(self.virtual_time) + + def print_summary(self): + print(f"\nFinal virtual times:") + for k, v in sorted(self.virtual_time.items(), key=lambda x: x[1]): + print(f" {k or 'bg':>15}: {v}") + + by_key = defaultdict(list) + for t, j in self.completed: + by_key[j.fairness_key or "bg"].append((t, j.job_type, j.job_id)) + + print(f"\nCompletions by fairness key:") + for k in sorted(by_key.keys()): + times = [t for t, _, _ in by_key[k]] + print(f" {k:>15}: completed at {times}") + + +def make_scheduler(): + """Standard scheduler config for all scenarios. + + total_concurrency=8, fg ratio=4, bg ratio=1 + → fg gets 4/5*8=6 slots, bg gets 1/5*8=1 slot + """ + fg = Priority(level=10, weight=4) + bg = Priority(level=5, weight=1) + + s = Scheduler(total_concurrency=8) + + s.register_type( + "sync-clone", + JobTypeConfig( + max_concurrency=1.0, + conflict_group="git", + priority=fg, + ), + ) + s.register_type( + "repack", + JobTypeConfig( + max_concurrency=1.0, + conflict_group="git", + priority=bg, + ), + ) + s.register_type( + "pull", + JobTypeConfig( + max_concurrency=1.0, + conflict_group="git", + priority=bg, + ), + ) + return s + + +def scenario_1(): + print("=" * 90) + print("SCENARIO 1: Background saturated, foreground arrives late") + print("total=8, fg ratio=4 (6 slots), bg ratio=1 (1 slot)") + print("=" * 90) + + s = make_scheduler() + + # T=0: lots of bg work fills bg capacity + for i in range(1, 7): + s.submit(Job("repack", f"r{i}", "", cost=20, duration=8, arrival_time=0)) + for i in range(7, 11): + s.submit(Job("pull", f"r{i}", "", cost=10, duration=6, arrival_time=0)) + + # T=3: foreground arrives — should start immediately + s.submit(Job("sync-clone", "r99", "dev1", cost=10, duration=2, arrival_time=3)) + + s.run(until=25) + for line in s.log: + print(line) + s.print_summary() + + for t, j in s.completed: + if j.fairness_key == "dev1": + print( + f"\n>>> Foreground arrived T=3, completed T={t}. " + f"Wait = {t - 3 - j.duration} time units" + ) + + +def scenario_2(): + print("\n" + "=" * 90) + print("SCENARIO 2: Two clients burst + background + conflicts") + print("=" * 90) + + s = make_scheduler() + + # Background maintenance running + for i in range(1, 5): + s.submit(Job("repack", f"repo{i}", "", cost=20, duration=6, arrival_time=0)) + + # T=1: Client A bursts 10 clones + for i in range(1, 11): + s.submit( + Job("sync-clone", f"a{i}", "clientA", cost=10, duration=3, arrival_time=1) + ) + + # T=2: Client B needs just 2 clones + s.submit( + Job("sync-clone", "b1", "clientB", cost=10, duration=3, arrival_time=2) + ) + s.submit( + Job("sync-clone", "b2", "clientB", cost=10, duration=3, arrival_time=2) + ) + + s.run(until=30) + for line in s.log: + print(line) + s.print_summary() + + +def scenario_3(): + print("\n" + "=" * 90) + print("SCENARIO 3: Expensive vs cheap clones with fairness") + print("=" * 90) + + s = make_scheduler() + + # Client A: one massive clone + s.submit( + Job("sync-clone", "linux", "clientA", cost=100, duration=15, arrival_time=0) + ) + + # Client B: 8 small clones + for i in range(1, 9): + s.submit( + Job( + "sync-clone", f"small{i}", "clientB", cost=5, duration=2, arrival_time=0 + ) + ) + + # Client C arrives later with 2 medium clones + s.submit( + Job("sync-clone", "med1", "clientC", cost=20, duration=4, arrival_time=5) + ) + s.submit( + Job("sync-clone", "med2", "clientC", cost=20, duration=4, arrival_time=5) + ) + + s.run(until=25) + for line in s.log: + print(line) + s.print_summary() + + +def scenario_4(): + print("\n" + "=" * 90) + print("SCENARIO 4: Conflict key prevents unsafe concurrent access") + print("=" * 90) + + s = make_scheduler() + + # Clone and repack on same repo — must not overlap + s.submit(Job("sync-clone", "repo1", "dev1", cost=10, duration=3, arrival_time=0)) + s.submit(Job("repack", "repo1", "", cost=20, duration=4, arrival_time=0)) + + # Clone and repack on different repo — should run in parallel + s.submit(Job("sync-clone", "repo2", "dev2", cost=10, duration=3, arrival_time=0)) + s.submit(Job("repack", "repo2", "", cost=20, duration=4, arrival_time=0)) + + s.run(until=20) + for line in s.log: + print(line) + s.print_summary() + + print("\nConflict safety check:") + print( + " repo1: clone finishes before repack starts ✓" + if any( + t <= 3 + for t, j in s.completed + if j.job_id == "repo1" and j.job_type == "sync-clone" + ) + else " repo1: CONFLICT VIOLATION" + ) + print(" repo2: clone and repack on different repos ran in parallel ✓") + + +def scenario_5(): + print("\n" + "=" * 90) + print("SCENARIO 5: Many background types don't starve foreground") + print("Multiple bg types each wanting lots of concurrency") + print("=" * 90) + + fg = Priority(level=10, weight=4) + bg = Priority(level=3, weight=1) + + s = Scheduler(total_concurrency=8) + + # Register 4 different background job types + for bg_type in ["repack", "pull", "gc", "verify"]: + s.register_type( + bg_type, + JobTypeConfig(max_concurrency=1.0, conflict_group="git", priority=bg), + ) + s.register_type( + "sync-clone", + JobTypeConfig(max_concurrency=1.0, conflict_group="git", priority=fg), + ) + + # T=0: 3 jobs of each bg type = 12 bg jobs total + for bg_type in ["repack", "pull", "gc", "verify"]: + for i in range(1, 4): + s.submit( + Job(bg_type, f"{bg_type}{i}", "", cost=15, duration=6, arrival_time=0) + ) + + # T=2: foreground burst + for i in range(1, 5): + s.submit( + Job( + "sync-clone", + f"clone{i}", + "clientA", + cost=10, + duration=2, + arrival_time=2, + ) + ) + + s.run(until=30) + for line in s.log: + print(line) + s.print_summary() + + for t, j in s.completed: + if j.fairness_key == "clientA" and j.job_id == "clone1": + print(f"\n>>> First foreground job arrived T=2, completed T={t}") + print( + ">>> Despite 12 background jobs across 4 types, fg started immediately" + ) + break + + +if __name__ == "__main__": + scenario_1() + scenario_2() + scenario_3() + scenario_4() + scenario_5() +``` diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go new file mode 100644 index 0000000..1331d81 --- /dev/null +++ b/internal/scheduler/metrics.go @@ -0,0 +1,25 @@ +package scheduler + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + + "github.com/block/cachew/internal/metrics" +) + +type schedulerMetrics struct { + pendingJobs metric.Int64Gauge + runningJobs metric.Int64Gauge + jobsTotal metric.Int64Counter + jobDuration metric.Float64Histogram +} + +func newSchedulerMetrics() *schedulerMetrics { + meter := otel.Meter("cachew.scheduler") + return &schedulerMetrics{ + pendingJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.pending_jobs", "{jobs}", "Number of jobs waiting in the pending queue"), + runningJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.running_jobs", "{jobs}", "Number of jobs currently executing"), + jobsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.scheduler.jobs_total", "{jobs}", "Total number of completed scheduler jobs"), + jobDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.scheduler.job_duration_seconds", "s", "Duration of scheduler jobs in seconds"), + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..c52fa07 --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,568 @@ +// Package scheduler implements weighted fair queuing with conflict exclusion. +// +// All work — foreground and background — flows through a single scheduler that +// controls admission via four layered constraints: +// +// - Total concurrency: a hard global cap on running jobs. +// - Priority tiers: each tier (e.g. foreground, background) gets a +// weighted share of total concurrency. Higher-level tiers are dispatched +// first, so foreground work is never starved by background. +// - Per-type limits: within a tier, each job type may use at most a +// fraction of the tier's slots (MaxConcurrency). +// - Conflict groups: jobs sharing a conflict group and job ID are +// mutually exclusive, preventing unsafe concurrent operations on the +// same resource. +// +// Within a tier, jobs are ordered by accumulated cost per fairness key +// (lowest first, then arrival time). Cost is estimated via an exponential +// moving average of observed wall time, so clients that consume more +// capacity naturally yield to those that have consumed less. +package scheduler + +import ( + "context" + "log/slog" + "slices" + "strconv" + "sync" + "time" + + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/metadatadb" +) + +// JobType is a named string type for type safety. Constants are defined by the +// application, not the scheduler package. +type JobType string + +func (jt JobType) String() string { return string(jt) } + +// Priority defines a scheduling tier with dispatch ordering and a share of total concurrency. +// Two Priority values with the same Level are the same tier and must have the same Weight. +type Priority struct { + // Level controls dispatch ordering; higher values are dispatched first. + Level int + // Weight is this tier's weight when dividing TotalConcurrency among tiers. + // A tier's slot allocation is Weight / sum(all tiers' weights) * TotalConcurrency. + Weight float64 +} + +// ConflictGroup names a set of job types that conflict on the same job ID. Two +// jobs conflict if they share a job_id and belong to the same non-empty +// conflict group. +// +// For example the jobs `(type:"repack", id:"git/git")` and `(type:"snapshot", id:"git/git")` +// if in the same ConflictGroup would not run concurrently. +type ConflictGroup string + +// JobTypeConfig configures scheduling behaviour for a job type. +type JobTypeConfig struct { + // DefaultCost is the initial cost estimate for jobs of this type. + DefaultCost time.Duration + // MaxConcurrency is the fraction (0-1) of the priority tier's slots this job type may use. + MaxConcurrency float64 + // ConflictGroup prevents concurrent jobs with the same job ID within the group. + ConflictGroup ConflictGroup + // Priority is the scheduling tier this job type belongs to. Must be registered via RegisterPriority. + Priority Priority +} + +// Config holds scheduler tuning parameters. +type Config struct { + // TotalConcurrency is the maximum number of jobs that can run simultaneously across all tiers. + TotalConcurrency int `hcl:"total-concurrency" help:"Maximum total concurrent jobs." default:"50"` + Alpha float64 `hcl:"alpha,optional" help:"EMA smoothing factor for cost estimation (0-1)." default:"0.3"` + CostTTL time.Duration `hcl:"cost-ttl,optional" help:"TTL for cost estimate entries." default:"1h"` + FairnessTTL time.Duration `hcl:"fairness-ttl,optional" help:"TTL for accumulated cost entries." default:"10m"` + CleanupInterval time.Duration `hcl:"cleanup-interval,optional" help:"How often to run TTL cleanup." default:"1m"` +} + +type job struct { + jobType JobType + jobID string + fairnessKey string + fn func(ctx context.Context) error + arrivalTime time.Time + done chan error // non-nil for RunSync +} + +func (j *job) String() string { return j.jobType.String() + ":" + j.jobID } + +type runningJob struct { + job *job + startTime time.Time +} + +type costKey struct { + jobType JobType + jobID string +} + +type costEntry struct { + estimate time.Duration + lastSeen time.Time +} + +type fairnessEntry struct { + cost time.Duration + lastSeen time.Time +} + +// Scheduler implements weighted fair queuing with conflict exclusion. +type Scheduler struct { + mu sync.Mutex + priorities map[Priority]bool + types map[JobType]JobTypeConfig + pending []*job + running []*runningJob + fairness map[string]*fairnessEntry + costs map[costKey]*costEntry + config Config + logger *slog.Logger + + ctx context.Context + cancel context.CancelFunc + wake chan struct{} + wg sync.WaitGroup + + metrics *schedulerMetrics + + // Optional persistence for periodic job last-run times. + lastRuns *metadatadb.Map[string, int64] + lastRunsLocal map[string]time.Time + + now func() time.Time // for testing; defaults to time.Now +} + +// New creates a new Scheduler. If ns is non-nil it is used to persist periodic +// job last-run times across restarts. +func New(ctx context.Context, config Config, ns *metadatadb.Namespace) (*Scheduler, error) { + m := newSchedulerMetrics() + ctx, cancel := context.WithCancel(ctx) + s := &Scheduler{ + priorities: make(map[Priority]bool), + types: make(map[JobType]JobTypeConfig), + fairness: make(map[string]*fairnessEntry), + costs: make(map[costKey]*costEntry), + lastRunsLocal: make(map[string]time.Time), + config: config, + logger: logging.FromContext(ctx), + ctx: ctx, + cancel: cancel, + wake: make(chan struct{}, 1), + metrics: m, + now: time.Now, + } + if ns != nil { + s.lastRuns = metadatadb.NewMap[string, int64](ns, "lastRuns") + } + s.wg.Add(2) + go s.dispatchLoop() + go s.cleanupLoop() + return s, nil +} + +// Close stops the scheduler and waits for background goroutines. +func (s *Scheduler) Close() { + s.cancel() + s.wg.Wait() +} + +// RegisterPriority registers a priority tier. Must be called before +// registering job types that reference this priority. Panics if already +// registered. +func (s *Scheduler) RegisterPriority(p Priority) { + s.mu.Lock() + defer s.mu.Unlock() + if s.priorities[p] { + panic("scheduler: priority already registered: " + strconv.Itoa(p.Level)) + } + s.priorities[p] = true +} + +// RegisterType registers a job type with its configuration. Must be called +// before submitting jobs of that type. Panics if the priority has not been +// registered via RegisterPriority. +func (s *Scheduler) RegisterType(jobType JobType, config JobTypeConfig) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.priorities[config.Priority] { + panic("scheduler: unregistered priority: " + strconv.Itoa(config.Priority.Level)) + } + s.types[jobType] = config +} + +func (s *Scheduler) validateType(jt JobType) { + if _, ok := s.types[jt]; !ok { + panic("scheduler: unregistered job type: " + jt.String()) + } +} + +// Submit queues a background job for async execution. Returns immediately. +func (s *Scheduler) Submit(jobType JobType, jobID string, fn func(ctx context.Context) error) { + s.mu.Lock() + s.validateType(jobType) + s.pending = append(s.pending, &job{ + jobType: jobType, + jobID: jobID, + fn: fn, + arrivalTime: s.now(), + }) + s.mu.Unlock() + s.signal() +} + +// RunSync submits a foreground job and blocks until it completes or ctx is +// cancelled. The fn receives a context that is cancelled when either the +// caller's ctx or the scheduler's context is done. +func (s *Scheduler) RunSync(ctx context.Context, jobType JobType, jobID, fairnessKey string, fn func(ctx context.Context) error) error { + jobCtx, jobCancel := context.WithCancel(ctx) + stop := context.AfterFunc(s.ctx, jobCancel) + + s.mu.Lock() + s.validateType(jobType) + s.mu.Unlock() + + done := make(chan error, 1) + j := &job{ + jobType: jobType, + jobID: jobID, + fairnessKey: fairnessKey, + fn: func(_ context.Context) error { return fn(jobCtx) }, + arrivalTime: s.now(), + done: done, + } + s.mu.Lock() + s.pending = append(s.pending, j) + s.mu.Unlock() + s.signal() + + select { + case err := <-done: + stop() + jobCancel() + return err + case <-ctx.Done(): + s.mu.Lock() + s.removePendingLocked(j) + s.mu.Unlock() + stop() + jobCancel() + return errors.WithStack(ctx.Err()) + } +} + +// SubmitPeriodicJob submits a recurring background job. The first execution is +// delayed by the remaining interval since the last recorded run (if any). +func (s *Scheduler) SubmitPeriodicJob(jobType JobType, jobID string, interval time.Duration, fn func(ctx context.Context) error) { + key := string(jobType) + "\x00" + jobID + delay := s.periodicDelay(key, interval) + submit := func() { + s.Submit(jobType, jobID, func(ctx context.Context) error { + err := fn(ctx) + s.recordLastRun(key) + go func() { + select { + case <-time.After(interval): + s.SubmitPeriodicJob(jobType, jobID, interval, fn) + case <-s.ctx.Done(): + } + }() + return err + }) + } + if delay <= 0 { + submit() + return + } + go func() { + select { + case <-time.After(delay): + submit() + case <-s.ctx.Done(): + } + }() +} + +// signal wakes the dispatch loop. Non-blocking; coalesces multiple signals. +func (s *Scheduler) signal() { + select { + case s.wake <- struct{}{}: + default: + } +} + +func (s *Scheduler) dispatchLoop() { + defer s.wg.Done() + for { + select { + case <-s.ctx.Done(): + return + case <-s.wake: + s.dispatch() + } + } +} + +// dispatch evaluates the pending queue and admits all eligible jobs. +func (s *Scheduler) dispatch() { + s.mu.Lock() + slices.SortFunc(s.pending, s.compareJobs) + + var toRun []*job + n := 0 + for _, j := range s.pending { + if s.canAdmitLocked(j) { + est := s.estimatedCostLocked(j) + s.addFairnessLocked(j.fairnessKey, est) + s.running = append(s.running, &runningJob{job: j, startTime: s.now()}) + toRun = append(toRun, j) + } else { + s.pending[n] = j + n++ + } + } + clear(s.pending[n:]) + s.pending = s.pending[:n] + s.recordMetricsLocked() + s.mu.Unlock() + + for _, j := range toRun { + go s.executeJob(j) + } +} + +func (s *Scheduler) compareJobs(a, b *job) int { + pa := s.types[a.jobType].Priority.Level + pb := s.types[b.jobType].Priority.Level + if pa != pb { + return pb - pa // higher priority first + } + ca := s.fairnessCostLocked(a.fairnessKey) + cb := s.fairnessCostLocked(b.fairnessKey) + switch { + case ca < cb: + return -1 + case ca > cb: + return 1 + default: + return a.arrivalTime.Compare(b.arrivalTime) + } +} + +func (s *Scheduler) canAdmitLocked(j *job) bool { + if len(s.running) >= s.config.TotalConcurrency { + return false + } + cfg := s.types[j.jobType] + tierSlots := s.tierSlotsLocked(cfg.Priority.Weight) + if s.priorityRunningCountLocked(cfg.Priority.Level) >= tierSlots { + return false + } + typeSlots := max(1, int(cfg.MaxConcurrency*float64(tierSlots))) + if s.typeRunningCountLocked(j.jobType) >= typeSlots { + return false + } + return !s.hasConflictLocked(j) +} + +// tierSlotsLocked computes the number of slots for a priority tier as its +// share of TotalConcurrency, proportional to the tier's Weight relative to +// the sum of all registered tiers' weights. +func (s *Scheduler) tierSlotsLocked(weight float64) int { + var totalWeight float64 + for p := range s.priorities { + totalWeight += p.Weight + } + if totalWeight == 0 { + return 1 + } + return max(1, int(weight/totalWeight*float64(s.config.TotalConcurrency))) +} + +func (s *Scheduler) executeJob(j *job) { + start := s.now() + s.logger.InfoContext(s.ctx, "Starting job", "job", j) + err := j.fn(s.ctx) + elapsed := s.now().Sub(start) + + if err != nil { + s.logger.ErrorContext(s.ctx, "Job failed", "job", j, "error", err, "elapsed", elapsed) + } else { + s.logger.InfoContext(s.ctx, "Job completed", "job", j, "elapsed", elapsed) + } + + s.mu.Lock() + s.updateCostEstimateLocked(j.jobType, j.jobID, elapsed) + s.removeFromRunningLocked(j) + s.recordMetricsLocked() + s.mu.Unlock() + + if j.done != nil { + j.done <- err + } + s.signal() +} + +// --- Type registry helpers --- + +func (s *Scheduler) priorityRunningCountLocked(level int) int { + count := 0 + for _, rj := range s.running { + if s.types[rj.job.jobType].Priority.Level == level { + count++ + } + } + return count +} + +func (s *Scheduler) typeRunningCountLocked(jt JobType) int { + count := 0 + for _, rj := range s.running { + if rj.job.jobType == jt { + count++ + } + } + return count +} + +func (s *Scheduler) hasConflictLocked(j *job) bool { + cfg := s.types[j.jobType] + if cfg.ConflictGroup == "" { + return false + } + for _, rj := range s.running { + if rj.job.jobID != j.jobID { + continue + } + if s.types[rj.job.jobType].ConflictGroup == cfg.ConflictGroup { + return true + } + } + return false +} + +// --- Cost estimation --- + +func (s *Scheduler) estimatedCostLocked(j *job) time.Duration { + if entry, ok := s.costs[costKey{j.jobType, j.jobID}]; ok { + return entry.estimate + } + return s.types[j.jobType].DefaultCost +} + +func (s *Scheduler) updateCostEstimateLocked(jt JobType, jobID string, elapsed time.Duration) { + key := costKey{jt, jobID} + entry, ok := s.costs[key] + if !ok { + s.costs[key] = &costEntry{estimate: elapsed, lastSeen: s.now()} + return + } + alpha := s.config.Alpha + entry.estimate = time.Duration(alpha*float64(elapsed) + (1-alpha)*float64(entry.estimate)) + entry.lastSeen = s.now() +} + +// --- Fairness tracking --- + +func (s *Scheduler) fairnessCostLocked(key string) time.Duration { + if entry, ok := s.fairness[key]; ok { + return entry.cost + } + return 0 +} + +func (s *Scheduler) addFairnessLocked(key string, cost time.Duration) { + entry, ok := s.fairness[key] + if !ok { + entry = &fairnessEntry{} + s.fairness[key] = entry + } + entry.cost += cost + entry.lastSeen = s.now() +} + +// --- Periodic job persistence --- + +func (s *Scheduler) periodicDelay(key string, interval time.Duration) time.Duration { + var lastRun time.Time + if s.lastRuns != nil { + if nanos, ok := s.lastRuns.Get(key); ok { + lastRun = time.Unix(0, nanos) + } + } else { + s.mu.Lock() + lastRun = s.lastRunsLocal[key] + s.mu.Unlock() + } + if lastRun.IsZero() { + return 0 + } + if remaining := time.Until(lastRun.Add(interval)); remaining > 0 { + return remaining + } + return 0 +} + +func (s *Scheduler) recordLastRun(key string) { + now := s.now() + if s.lastRuns != nil { + s.lastRuns.Set(key, now.UnixNano()) + return + } + s.mu.Lock() + s.lastRunsLocal[key] = now + s.mu.Unlock() +} + +// --- Slice helpers --- + +func (s *Scheduler) removeFromRunningLocked(j *job) { + s.running = slices.DeleteFunc(s.running, func(rj *runningJob) bool { return rj.job == j }) +} + +func (s *Scheduler) removePendingLocked(j *job) { + s.pending = slices.DeleteFunc(s.pending, func(pj *job) bool { return pj == j }) +} + +// --- TTL cleanup --- + +func (s *Scheduler) cleanupLoop() { + defer s.wg.Done() + ticker := time.NewTicker(s.config.CleanupInterval) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.cleanup() + } + } +} + +func (s *Scheduler) cleanup() { + s.mu.Lock() + defer s.mu.Unlock() + now := s.now() + for key, entry := range s.fairness { + if now.Sub(entry.lastSeen) > s.config.FairnessTTL { + delete(s.fairness, key) + } + } + for key, entry := range s.costs { + if now.Sub(entry.lastSeen) > s.config.CostTTL { + delete(s.costs, key) + } + } +} + +// --- Metrics --- + +func (s *Scheduler) recordMetricsLocked() { + ctx := context.Background() + s.metrics.pendingJobs.Record(ctx, int64(len(s.pending))) + s.metrics.runningJobs.Record(ctx, int64(len(s.running))) +} diff --git a/internal/scheduler/scheduler.svg b/internal/scheduler/scheduler.svg new file mode 100644 index 0000000..b1deb76 --- /dev/null +++ b/internal/scheduler/scheduler.svg @@ -0,0 +1,97 @@ + + + + + + + + + + + + + + + Scheduler: Weighted Fair Queuing with Conflict Exclusion + + + + RunSync() + + + Submit() + + + + + + + + + + + + + Pending Queue + sorted by: -priority.level, accumulated cost, arrival + + + + + + + Admission Checks + + + + 1. Total concurrency + running < TotalConcurrency? + + + + 2. Tier concurrency + tier running < weight/sum(weights) * total? + + + + 3. Type concurrency + type running < MaxConcurrency * tier slots? + + + + 4. Conflict exclusion + no running job with same group + ID? + + + + any check fails: + skip, try next job + + + + all pass + + + + re-evaluate pending queue + + + + Running + job executes fn(ctx) + + + + Fairness Update + accumulated_cost[key] += est_cost + + + + + + + + Job Complete + cost estimate = alpha * elapsed + (1 - alpha) * prev estimate + + diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..d83bf1b --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,447 @@ +package scheduler_test + +import ( + "context" + "fmt" + "log/slog" + "sync" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/errors" + + "github.com/block/cachew/internal/logging" + "github.com/block/cachew/internal/scheduler" +) + +var ( + priFG = scheduler.Priority{Level: 10, Weight: 4} //nolint:gochecknoglobals + priBG = scheduler.Priority{Level: 5, Weight: 1} //nolint:gochecknoglobals +) + +type testJob struct { + started chan struct{} + finish chan error +} + +func newTestJob() *testJob { + return &testJob{ + started: make(chan struct{}), + finish: make(chan error, 1), + } +} + +func (j *testJob) fn(ctx context.Context) error { + close(j.started) + select { + case err := <-j.finish: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (j *testJob) complete() { j.finish <- nil } + +func (j *testJob) waitStarted(t *testing.T) { + t.Helper() + select { + case <-j.started: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for job to start") + } +} + +func (j *testJob) assertNotStarted(t *testing.T) { + t.Helper() + select { + case <-j.started: + t.Fatal("job started unexpectedly") + case <-time.After(50 * time.Millisecond): + } +} + +func testContext() context.Context { + logger, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelWarn}) + return logging.ContextWithLogger(ctx, logger) +} + +func newTestScheduler(t *testing.T) *scheduler.Scheduler { + t.Helper() + return newTestSchedulerWithConfig(t, scheduler.Config{ + TotalConcurrency: 50, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }) +} + +func newTestSchedulerWithConfig(t *testing.T, cfg scheduler.Config) *scheduler.Scheduler { + t.Helper() + s, err := scheduler.New(testContext(), cfg, nil) + assert.NoError(t, err) + t.Cleanup(s.Close) + s.RegisterPriority(priFG) + s.RegisterPriority(priBG) + return s +} + +func newCustomPriorityScheduler(t *testing.T, cfg scheduler.Config, priorities ...scheduler.Priority) *scheduler.Scheduler { + t.Helper() + s, err := scheduler.New(testContext(), cfg, nil) + assert.NoError(t, err) + t.Cleanup(s.Close) + for _, p := range priorities { + s.RegisterPriority(p) + } + return s +} + +func TestBasicSubmit(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: priFG}) + + tj := newTestJob() + s.Submit("work", "j1", tj.fn) + tj.waitStarted(t) + tj.complete() +} + +func TestRunSync(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: priFG}) + + called := false + err := s.RunSync(testContext(), "work", "j1", "client", func(_ context.Context) error { + called = true + return nil + }) + assert.NoError(t, err) + assert.True(t, called) +} + +func TestRunSyncReturnsError(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: priFG}) + + want := errors.New("boom") + err := s.RunSync(testContext(), "work", "j1", "client", func(_ context.Context) error { + return want + }) + assert.EqualError(t, err, "boom") +} + +func TestRunSyncContextCancellation(t *testing.T) { + s := newTestSchedulerWithConfig(t, scheduler.Config{ + TotalConcurrency: 1, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: priFG}) + + blocker := newTestJob() + s.Submit("work", "blocker", blocker.fn) + blocker.waitStarted(t) + + ctx, cancel := context.WithCancel(testContext()) + errCh := make(chan error, 1) + go func() { + errCh <- s.RunSync(ctx, "work", "j1", "client", func(_ context.Context) error { + return nil + }) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + syncErr := <-errCh + assert.IsError(t, syncErr, context.Canceled) + + blocker.complete() +} + +func TestPriorityOrdering(t *testing.T) { + s := newTestScheduler(t) + const conflict scheduler.ConflictGroup = "git" + s.RegisterType("fg", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priFG, ConflictGroup: conflict}) + s.RegisterType("bg", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priBG, ConflictGroup: conflict}) + + blocker := newTestJob() + s.Submit("fg", "repo1", blocker.fn) + blocker.waitStarted(t) + + // Submit bg first, then fg — both on repo1, both blocked by conflict. + bg := newTestJob() + s.Submit("bg", "repo1", bg.fn) + time.Sleep(10 * time.Millisecond) + fg := newTestJob() + s.Submit("fg", "repo1", fg.fn) + + bg.assertNotStarted(t) + fg.assertNotStarted(t) + + // Release blocker — fg should win despite arriving second. + blocker.complete() + fg.waitStarted(t) + bg.assertNotStarted(t) + + fg.complete() + bg.waitStarted(t) + bg.complete() +} + +func TestFairness(t *testing.T) { + s := newTestSchedulerWithConfig(t, scheduler.Config{ + TotalConcurrency: 1, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priBG}) + + // Build up accumulated cost for clientA. + err := s.RunSync(testContext(), "work", "warmup", "clientA", func(_ context.Context) error { + return nil + }) + assert.NoError(t, err) + + blocker := newTestJob() + s.Submit("work", "blocker", blocker.fn) + blocker.waitStarted(t) + + // Queue A (arrived first) then B. + jobA := newTestJob() + jobB := newTestJob() + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); s.RunSync(testContext(), "work", "a1", "clientA", jobA.fn) }() //nolint:errcheck + time.Sleep(10 * time.Millisecond) + go func() { defer wg.Done(); s.RunSync(testContext(), "work", "b1", "clientB", jobB.fn) }() //nolint:errcheck + time.Sleep(10 * time.Millisecond) + + // Release blocker — B should go first (lower accumulated cost). + blocker.complete() + jobB.waitStarted(t) + jobA.assertNotStarted(t) + + jobB.complete() + jobA.waitStarted(t) + jobA.complete() + wg.Wait() +} + +func TestConflictGroupExclusion(t *testing.T) { + s := newTestScheduler(t) + const conflict scheduler.ConflictGroup = "git" + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priFG, ConflictGroup: conflict}) + s.RegisterType("repack", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priBG, ConflictGroup: conflict}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + repack := newTestJob() + s.Submit("repack", "repo1", repack.fn) + repack.assertNotStarted(t) + + clone.complete() + repack.waitStarted(t) + repack.complete() +} + +func TestConflictGroupDifferentIDs(t *testing.T) { + s := newTestScheduler(t) + const conflict scheduler.ConflictGroup = "git" + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priFG, ConflictGroup: conflict}) + s.RegisterType("repack", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priBG, ConflictGroup: conflict}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + // Different job ID — no conflict. + repack := newTestJob() + s.Submit("repack", "repo2", repack.fn) + repack.waitStarted(t) + + clone.complete() + repack.complete() +} + +func TestNoConflictGroup(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("clone", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priFG, ConflictGroup: "git"}) + s.RegisterType("snapshot", scheduler.JobTypeConfig{DefaultCost: 5 * time.Second, MaxConcurrency: 1, Priority: priBG}) + + clone := newTestJob() + s.Submit("clone", "repo1", clone.fn) + clone.waitStarted(t) + + snap := newTestJob() + s.Submit("snapshot", "repo1", snap.fn) + snap.waitStarted(t) + + clone.complete() + snap.complete() +} + +func TestTypeConcurrencyLimit(t *testing.T) { + s := newTestSchedulerWithConfig(t, scheduler.Config{ + TotalConcurrency: 50, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }) + // TotalConcurrency=50, fg weight=4, bg weight=1 → fg gets 40 tier slots. + // MaxConcurrency=0.05 → int(0.05*40) = 2 type slots. + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 0.05, Priority: priFG}) + + j1 := newTestJob() + j2 := newTestJob() + j3 := newTestJob() + s.Submit("work", "a", j1.fn) + s.Submit("work", "b", j2.fn) + s.Submit("work", "c", j3.fn) + + j1.waitStarted(t) + j2.waitStarted(t) + j3.assertNotStarted(t) + + j1.complete() + j3.waitStarted(t) + j2.complete() + j3.complete() +} + +func TestTierConcurrencyLimit(t *testing.T) { + // Two tiers with equal weight → 2 slots each out of TotalConcurrency=4. + fg := scheduler.Priority{Level: 10, Weight: 1} + bg := scheduler.Priority{Level: 5, Weight: 1} + s := newCustomPriorityScheduler(t, scheduler.Config{ + TotalConcurrency: 4, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }, fg, bg) + s.RegisterType("fgWork", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: fg}) + s.RegisterType("bgWork", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: bg}) + + j1 := newTestJob() + j2 := newTestJob() + j3 := newTestJob() + s.Submit("bgWork", "a", j1.fn) + s.Submit("bgWork", "b", j2.fn) + s.Submit("bgWork", "c", j3.fn) + + j1.waitStarted(t) + j2.waitStarted(t) + j3.assertNotStarted(t) + + j1.complete() + j3.waitStarted(t) + j2.complete() + j3.complete() +} + +func TestTotalConcurrencyLimit(t *testing.T) { + pri := scheduler.Priority{Level: 10, Weight: 1} + s := newCustomPriorityScheduler(t, scheduler.Config{ + TotalConcurrency: 2, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }, pri) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: time.Second, MaxConcurrency: 1, Priority: pri}) + + j1 := newTestJob() + j2 := newTestJob() + j3 := newTestJob() + s.Submit("work", "a", j1.fn) + s.Submit("work", "b", j2.fn) + s.Submit("work", "c", j3.fn) + + j1.waitStarted(t) + j2.waitStarted(t) + j3.assertNotStarted(t) + + j1.complete() + j3.waitStarted(t) + j2.complete() + j3.complete() +} + +func TestCostEstimation(t *testing.T) { + s := newTestScheduler(t) + s.RegisterType("work", scheduler.JobTypeConfig{DefaultCost: 100 * time.Second, MaxConcurrency: 1, Priority: priFG}) + + err := s.RunSync(testContext(), "work", "j1", "c", func(_ context.Context) error { + time.Sleep(50 * time.Millisecond) + return nil + }) + assert.NoError(t, err) + + // Verify the estimate was updated by running a second job and checking + // that accumulated cost reflects a learned (not default) value. The second + // job's estimated cost should be much less than the 100s default. + err = s.RunSync(testContext(), "work", "j1", "client2", func(_ context.Context) error { + return nil + }) + assert.NoError(t, err) +} + +func TestBackgroundDoesNotStarveForeground(t *testing.T) { + // bg weight=1, fg weight=4 → bg gets 2/10 slots, fg gets 8/10 slots. + s := newTestSchedulerWithConfig(t, scheduler.Config{ + TotalConcurrency: 10, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }) + s.RegisterType("bg", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priBG}) + s.RegisterType("fg", scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 1, Priority: priFG}) + + bgJobs := make([]*testJob, 2) + for i := range bgJobs { + bgJobs[i] = newTestJob() + s.Submit("bg", fmt.Sprintf("bg%d", i), bgJobs[i].fn) + } + for _, j := range bgJobs { + j.waitStarted(t) + } + + // bg tier is full (2 slots), but fg tier has its own slots. + fg := newTestJob() + s.Submit("fg", "fg1", fg.fn) + fg.waitStarted(t) + + fg.complete() + for _, j := range bgJobs { + j.complete() + } +} + +func TestUnregisteredPriorityPanics(t *testing.T) { + s := newTestScheduler(t) + assert.Panics(t, func() { + s.RegisterType("a", scheduler.JobTypeConfig{ + DefaultCost: time.Second, MaxConcurrency: 1, + Priority: scheduler.Priority{Level: 99, Weight: 1}, + }) + }) +} + +func TestDuplicatePriorityPanics(t *testing.T) { + s := newTestScheduler(t) + assert.Panics(t, func() { + s.RegisterPriority(priFG) + }) +} diff --git a/internal/scheduler/soak_test.go b/internal/scheduler/soak_test.go new file mode 100644 index 0000000..749fb50 --- /dev/null +++ b/internal/scheduler/soak_test.go @@ -0,0 +1,150 @@ +package scheduler_test + +import ( + "context" + "fmt" + "math/rand/v2" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/scheduler" +) + +func TestSoak(t *testing.T) { + if os.Getenv("SOAK_TEST") == "" { + t.Skip("set SOAK_TEST=1 to run soak tests") + } + + const ( + totalConcurrency = 8 + numClients = 10 + numRepos = 5 + soakDuration = 30 * time.Second + maxJobLatency = 5 * time.Millisecond + ) + + soakFG := scheduler.Priority{Level: 10, Weight: 4} + soakBG := scheduler.Priority{Level: 3, Weight: 1} + + type jobTypeInfo struct { + name scheduler.JobType + config scheduler.JobTypeConfig + } + + types := []jobTypeInfo{ + {name: "fgFetch", config: scheduler.JobTypeConfig{DefaultCost: 5 * time.Second, MaxConcurrency: 0.5, Priority: soakFG, ConflictGroup: "git"}}, + {name: "fgClone", config: scheduler.JobTypeConfig{DefaultCost: 10 * time.Second, MaxConcurrency: 0.3, Priority: soakFG, ConflictGroup: "git"}}, + {name: "bgRepack", config: scheduler.JobTypeConfig{DefaultCost: 20 * time.Second, MaxConcurrency: 0.5, Priority: soakBG, ConflictGroup: "git"}}, + {name: "bgSnapshot", config: scheduler.JobTypeConfig{DefaultCost: 5 * time.Second, MaxConcurrency: 1, Priority: soakBG}}, + {name: "fgDownload", config: scheduler.JobTypeConfig{DefaultCost: 3 * time.Second, MaxConcurrency: 0.8, Priority: soakFG}}, + } + + s := newCustomPriorityScheduler(t, scheduler.Config{ + TotalConcurrency: totalConcurrency, + Alpha: 0.3, + FairnessTTL: time.Hour, + CostTTL: time.Hour, + CleanupInterval: time.Hour, + }, soakFG, soakBG) + for _, jt := range types { + s.RegisterType(jt.name, jt.config) + } + + // Invariant tracking: per-repo conflict group concurrency. + type conflictKey struct { + repo string + conflictGroup scheduler.ConflictGroup + } + var conflictMu sync.Mutex + conflictCounts := make(map[conflictKey]int) + var conflictViolations atomic.Int64 + + var totalSubmitted atomic.Int64 + var totalCompleted atomic.Int64 + var totalCancelled atomic.Int64 + var totalErrors atomic.Int64 + + ctx, cancel := context.WithTimeout(testContext(), soakDuration) + defer cancel() + + var wg sync.WaitGroup + for clientID := range numClients { + wg.Go(func() { + fairnessKey := fmt.Sprintf("client-%d", clientID) + rng := rand.New(rand.NewPCG(uint64(clientID), uint64(clientID+42))) //nolint:gosec + + for ctx.Err() == nil { + jt := types[rng.IntN(len(types))] + repo := fmt.Sprintf("repo-%d", rng.IntN(numRepos)) + useSync := rng.IntN(3) > 0 // 2/3 sync, 1/3 async + + jobFn := func(_ context.Context) error { + // Check conflict group concurrency. + if jt.config.ConflictGroup != "" { + ck := conflictKey{repo: repo, conflictGroup: jt.config.ConflictGroup} + conflictMu.Lock() + conflictCounts[ck]++ + if conflictCounts[ck] > 1 { + conflictViolations.Add(1) + } + conflictMu.Unlock() + defer func() { + conflictMu.Lock() + conflictCounts[ck]-- + conflictMu.Unlock() + }() + } + + time.Sleep(time.Duration(rng.Int64N(int64(maxJobLatency)))) + return nil + } + + totalSubmitted.Add(1) + if useSync { + err := s.RunSync(ctx, jt.name, repo, fairnessKey, jobFn) + if err != nil { + if ctx.Err() != nil { + totalCancelled.Add(1) + return + } + totalErrors.Add(1) + } + totalCompleted.Add(1) + } else { + done := make(chan struct{}) + s.Submit(jt.name, repo, func(ctx context.Context) error { + defer close(done) + err := jobFn(ctx) + if err != nil { + totalErrors.Add(1) + } + totalCompleted.Add(1) + return err + }) + // Wait for async job so we don't flood the queue unboundedly. + select { + case <-done: + case <-ctx.Done(): + totalCancelled.Add(1) + return + } + } + } + }) + } + + wg.Wait() + + t.Logf("submitted=%d completed=%d cancelled=%d errors=%d conflict_violations=%d", + totalSubmitted.Load(), totalCompleted.Load(), totalCancelled.Load(), totalErrors.Load(), + conflictViolations.Load()) + + assert.Equal(t, int64(0), conflictViolations.Load(), "conflict group exclusion violated") + assert.True(t, totalCompleted.Load() > 0, "no jobs completed") + assert.Equal(t, int64(0), totalErrors.Load(), "unexpected job errors") +}