Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,13 @@ Statistical sampler that builds hour-of-day distributions (24 distributions, one

- Uses stable-baselines3 PPO with custom ComputeClusterEnv
- Environment simulates 2-week episodes (336 hours) with hourly decisions
- State space includes node counts, job queue, electricity prices, and time
- Action space controls the number of nodes to bring online/offline
- State space includes node counts, job queue, electricity prices, pending job statistics (count, core-hours, avg duration, max nodes), and backlog size
- Action space: `[action_type, magnitude, do_refill]` - controls nodes online/offline and whether to refill the job queue from the backlog
- Rewards balance efficiency, cost savings, and resource utilization
- Cluster configuration: 335 nodes max, 96 cores per node, up to 16 nodes per job
- Job queue: max 1000 jobs, max 1500 new jobs per hour, max 170h runtime
- Job queue: max 1000 jobs, max 1500 new jobs per hour, max 170h runtime; overflow goes to backlog
- Power consumption: 150W idle, 450W used per node
- Baseline comparison: greedy scheduler that keeps all nodes on and processes jobs FIFO

## Evaluation Metrics

Expand Down
12 changes: 11 additions & 1 deletion src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,21 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
metrics.baseline_jobs_submitted += new_jobs_count
metrics.episode_baseline_jobs_submitted += new_jobs_count

_, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
)

# Greedy loop: keep refilling from backlog and assigning until no more progress
while len(baseline_backlog_queue) > 0 and num_launched > 0:
baseline_next_empty_slot, moved = fill_queue_from_backlog(job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot)
if moved == 0:
break
num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
)

num_used_nodes = np.sum(baseline_state['nodes'] > 0)
num_on_nodes = np.sum(baseline_state['nodes'] > -1)
num_idle_nodes = num_on_nodes - num_used_nodes
Expand Down
2 changes: 2 additions & 0 deletions src/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def _on_step(self) -> bool:
self.logger.record("metrics/jobs_completed", env.metrics.episode_jobs_completed)
self.logger.record("metrics/completion_rate", completion_rate)
self.logger.record("metrics/avg_wait_hours", avg_wait)
self.logger.record("metrics/on_nodes", env.metrics.episode_on_nodes[-1])
self.logger.record("metrics/used_nodes", env.metrics.episode_used_nodes[-1])
self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached)
self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached)
self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped)
Expand Down
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
WEEK_HOURS = 168

MAX_NODES = 335 # Maximum number of nodes
MAX_QUEUE_SIZE = 1000 # Maximum number of jobs in the queue
MAX_QUEUE_SIZE = 2500 # Maximum number of jobs in the queue
MAX_CHANGE = MAX_NODES
MAX_JOB_DURATION = 170 # maximum job runtime in hours
# Use a very high cap; age-based dropping is temporarily disabled in code.
Expand Down
95 changes: 80 additions & 15 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,12 @@ def __init__(self,
# actions: - change number of available nodes:
# action_type: 0: decrease, 1: maintain, 2: increase
# action_magnitude: 0-MAX_CHANGE (+1ed in the action)
self.action_space = spaces.MultiDiscrete([3, MAX_CHANGE])
# do_refill: 0: don't refill from backlog, 1: refill from backlog
self.action_space = spaces.MultiDiscrete([3, MAX_CHANGE, 2])

self.observation_space = spaces.Dict({
# nodes: [-1: off, 0: idle, >0: booked for n hours]
'nodes': spaces.Box(
low=-1,
high=MAX_JOB_DURATION,
shape=(MAX_NODES,),
dtype=np.int32
),
'nodes': spaces.Box(low=-1, high=MAX_JOB_DURATION, shape=(MAX_NODES,), dtype=np.int32),
# job queue: [job duration, job age, job nodes, job cores per node, ...]
'job_queue': spaces.Box(
low=0,
Expand All @@ -159,12 +155,13 @@ def __init__(self,
dtype=np.int32
),
# predicted prices for the next 24h
'predicted_prices': spaces.Box(
low=-1000,
high=1000,
shape=(24,),
dtype=np.float32
),
'predicted_prices': spaces.Box(low=-1000, high=1000, shape=(24,), dtype=np.float32),
# Summary statistics for all outstanding jobs (queue + backlog)
'pending_job_count': spaces.Box(low=0, high=np.iinfo(np.int32).max, shape=(1,), dtype=np.int32),
'pending_core_hours': spaces.Box(low=0, high=np.finfo(np.float32).max, shape=(1,), dtype=np.float32),
'pending_avg_duration': spaces.Box(low=0, high=MAX_JOB_DURATION, shape=(1,), dtype=np.float32),
'pending_max_nodes': spaces.Box(low=0, high=MAX_NODES_PER_JOB, shape=(1,), dtype=np.int32),
'backlog_size': spaces.Box(low=0, high=np.iinfo(np.int32).max, shape=(1,), dtype=np.int32),
})

def _reset_timeline_state(self, start_index):
Expand All @@ -177,6 +174,12 @@ def _reset_timeline_state(self, start_index):
'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32),
# Initialize predicted prices array
'predicted_prices': self.prices.predicted_prices.copy(),
# Summary statistics for all outstanding jobs (queue + backlog)
'pending_job_count': np.array([0], dtype=np.int32),
'pending_core_hours': np.array([0.0], dtype=np.float32),
'pending_avg_duration': np.array([0.0], dtype=np.float32),
'pending_max_nodes': np.array([0], dtype=np.int32),
'backlog_size': np.array([0], dtype=np.int32),
}

self.baseline_state = {
Expand All @@ -200,6 +203,50 @@ def _reset_timeline_state(self, start_index):
self.next_empty_slot = 0
self.baseline_next_empty_slot = 0

def _update_pending_job_stats(self, job_queue_2d):
"""Update summary statistics for all outstanding jobs (queue + backlog)."""
# Collect stats from the main queue
active_jobs_mask = job_queue_2d[:, 0] > 0
queue_durations = job_queue_2d[active_jobs_mask, 0]
queue_nodes = job_queue_2d[active_jobs_mask, 2]
queue_cores = job_queue_2d[active_jobs_mask, 3]
queue_count = len(queue_durations)

# Collect stats from the backlog
backlog_count = len(self.backlog_queue)
if backlog_count > 0:
backlog_arr = np.array(list(self.backlog_queue))
backlog_durations = backlog_arr[:, 0]
backlog_nodes = backlog_arr[:, 2]
backlog_cores = backlog_arr[:, 3]
else:
backlog_durations = np.array([], dtype=np.int32)
backlog_nodes = np.array([], dtype=np.int32)
backlog_cores = np.array([], dtype=np.int32)

# Combine stats
total_count = queue_count + backlog_count
if total_count > 0:
all_durations = np.concatenate([queue_durations, backlog_durations])
all_nodes = np.concatenate([queue_nodes, backlog_nodes])
all_cores = np.concatenate([queue_cores, backlog_cores])

# Core-hours = sum of (duration * nodes * cores_per_node)
total_core_hours = np.sum(all_durations * all_nodes * all_cores)
avg_duration = np.mean(all_durations)
max_nodes = np.max(all_nodes)
else:
total_core_hours = 0.0
avg_duration = 0.0
max_nodes = 0

# Update state
self.state['pending_job_count'][0] = total_count
self.state['pending_core_hours'][0] = total_core_hours
self.state['pending_avg_duration'][0] = avg_duration
self.state['pending_max_nodes'][0] = max_nodes
self.state['backlog_size'][0] = backlog_count

def reset(self, seed=None, options=None):
if options is None:
options = {}
Expand Down Expand Up @@ -294,10 +341,10 @@ def step(self, action):
self.env_print(f">>> adding {len(new_jobs)} new jobs to the queue: {' '.join(['[{}h {} {}x{}]'.format(d, a, n, c) for d, a, n, c in new_jobs])}")
self.env_print("job_queue: ", ' '.join(['[{} {} {} {}]'.format(d, a, n, c) for d, a, n, c in job_queue_2d if d > 0]))

action_type, action_magnitude = action
action_type, action_magnitude, do_refill = action
action_magnitude += 1

self.env_print(f"[3] Adjusting nodes based on action: type={action_type}, magnitude={action_magnitude}...")
self.env_print(f"[3] Adjusting nodes based on action: type={action_type}, magnitude={action_magnitude}, refill={do_refill}...")
num_node_changes = adjust_nodes(action_type, action_magnitude, self.state['nodes'], self.cores_available, self.env_print)

# Assign jobs to available nodes
Expand All @@ -310,6 +357,24 @@ def step(self, action):

self.env_print(f" {num_launched_jobs} jobs launched")

# Refill queue from backlog if agent chose to do so
if do_refill == 1 and len(self.backlog_queue) > 0:
self.next_empty_slot, moved = fill_queue_from_backlog(job_queue_2d, self.backlog_queue, self.next_empty_slot)
if moved > 0:
self.env_print(f" {moved} jobs moved from backlog to queue")
# Try to assign the newly queued jobs
extra_launched, self.next_empty_slot, extra_dropped, self.next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
)
num_launched_jobs += extra_launched
num_dropped_this_step += extra_dropped
if extra_launched > 0:
self.env_print(f" {extra_launched} additional jobs launched from backlog")

# Update summary statistics for all outstanding jobs (queue + backlog)
self._update_pending_job_stats(job_queue_2d)

# Calculate node utilization stats
num_used_nodes = np.sum(self.state['nodes'] > 0)
num_on_nodes = np.sum(self.state['nodes'] >= 0)
Expand Down
2 changes: 1 addition & 1 deletion test/test_sanity_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def make_env_with_carry(carry_over_state, env_cls=ComputeClusterEnv):
# -------------------------------------

seed = 123
action = np.array([1, 0], dtype=np.int64) # "maintain, magnitude 1" effectively
action = np.array([1, 0, 1], dtype=np.int64) # "maintain, magnitude 1, refill" effectively

env = make_env_with_carry(False, env_cls=DeterministicPriceEnv)

Expand Down