Skip to content

Backlogger: Add refill from backlogger during job assignment#20

Open
enlorenz wants to merge 3 commits intoFairRootGroup:masterfrom
enlorenz:wip/integration
Open

Backlogger: Add refill from backlogger during job assignment#20
enlorenz wants to merge 3 commits intoFairRootGroup:masterfrom
enlorenz:wip/integration

Conversation

@enlorenz
Copy link

No description provided.

@enlorenz enlorenz marked this pull request as draft January 30, 2026 12:40
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

Adds a backlog-aware wrapper for job assignment, integrates it into environment and baseline call sites, exposes backlog_size/backlog_assigned in observations, and logs two new node-utilization metrics at episode end.

Changes

Cohort / File(s) Summary
Job assignment wrapper
src/job_management.py
Adds assign_jobs_with_backlog_refill(...): repeatedly calls assignment, refills main queue from backlog when empty, accumulates launched/dropped counts, and returns backlog_assigned (extra return value). Also adds age_waiting_jobs param to existing assign function.
Environment integration
src/environment.py
Replaces call to assign_jobs_to_available_nodes with assign_jobs_with_backlog_refill(...), passes backlog_queue, captures backlog_assigned, and adds backlog_size / backlog_assigned to observation space and state initialization/updates.
Baseline integration
src/baseline.py
Imports and calls assign_jobs_with_backlog_refill(...) instead of the old function, reorders/expands arguments to include metrics and baseline_backlog_queue, and captures the extra return value.
Metrics logging
src/callbacks.py
Appends metrics/on_nodes and metrics/used_nodes to end-of-episode logging, pulling last episode values (or 0 if empty).

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Env as Environment
participant JM as JobManagement
participant Backlog as BacklogQueue
participant Nodes as Nodes
Env->>JM: assign_jobs_with_backlog_refill(job_queue, nodes, cores, running, next_slot, next_job_id, metrics, backlog_queue, is_baseline)
JM->>Nodes: assign_jobs_to_available_nodes(...) — attempt assigns
alt main queue emptied and backlog has jobs
JM->>Backlog: move jobs into main queue
JM->>Nodes: assign_jobs_to_available_nodes(...) — assign newly filled jobs
loop until no progress
end
end
JM-->>Env: (total_launched, next_empty_slot, total_dropped, next_job_id, backlog_assigned)

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive No pull request description was provided by the author, so the check cannot confirm if description content relates to the changeset. Add a pull request description explaining the purpose and context of the backlog refill feature and why it was implemented.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Backlogger: Add refill from backlogger during job assignment' directly and specifically describes the main change—introducing backlog refill functionality during job assignment across multiple files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@enlorenz enlorenz marked this pull request as ready for review January 30, 2026 12:45
@enlorenz enlorenz marked this pull request as draft January 30, 2026 15:17
Enis Lorenz added 2 commits January 30, 2026 16:18
…obs, refill queue and continue assigning until ressources are filled up.

Add backlog refill loop helper for job assignment

centralize refill/assign loop in job_management
use progress-based break (launched or moved from backlog)
update environment and baseline to call helper
@enlorenz enlorenz marked this pull request as ready for review January 30, 2026 15:25
@enlorenz enlorenz marked this pull request as draft January 30, 2026 16:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@src/environment.py`:
- Around line 320-330: The backlog_assigned metric undercounts because
fill_queue_from_backlog runs earlier and its assigned jobs aren’t included in
the value returned by assign_jobs_with_backlog_refill; fix by capturing the
number of jobs pulled during the earlier prefill (e.g., record count from
fill_queue_from_backlog or len change on backlog_queue) and pass that into or
seed the wrapper used by assign_jobs_with_backlog_refill (or change the wrapper
signature to accept an initial_prefill_count) so that backlog_assigned =
prefill_count + wrapper_returned_backlog_assigned (or compute a combined value)
and store that combined value in self.state['backlog_assigned'][0].
- Around line 161-172: Observation lacks explicit time info: add a time feature
to the observation_space dict (e.g., 'time_of_day' or 'episode_hour') alongside
existing keys like 'backlog_size' and 'backlog_assigned', and ensure the
environment updates this feature on every reset() and step() (or inside the
existing _get_observation/_observe method) so the returned observation contains
the current time-of-day or hour-of-episode value; update observation_space
dtype/shape appropriately (e.g., Box(low=0, high=23, shape=(1,), dtype=np.int32)
or normalized float) and set the value during reset() initialization and each
step() before returning observations.

In `@src/job_management.py`:
- Around line 258-296: The backlog refill path can exceed the 1000-job queue cap
and skip per-hour/new-job and max-duration checks; update
assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog) to
enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.
- Around line 270-295: The loop currently re-calls
assign_jobs_to_available_nodes multiple times per step, which causes wait ages
to be incremented repeatedly; change the loop so it only iterates again when new
jobs were actually moved from backlog into the queue. Concretely, after calling
assign_jobs_to_available_nodes and optionally fill_queue_from_backlog, break out
of the while loop unless moved_from_backlog > 0 (i.e., only continue the loop
when fill_queue_from_backlog added jobs); update the loop's termination
condition that currently checks num_launched_jobs and moved_from_backlog so it
instead stops immediately when moved_from_backlog == 0 to prevent re‑aging,
referencing assign_jobs_to_available_nodes and fill_queue_from_backlog and the
moved_from_backlog variable.

Comment on lines +161 to +172
'backlog_size': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
'backlog_assigned': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add time features to the observation space.

The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to observation_space and update it each step/reset.

✅ Suggested addition (example)
             'predicted_prices': spaces.Box(
                 low=-1000,
                 high=1000,
                 shape=(24,),
                 dtype=np.float32
             ),
+            'time': spaces.Box(
+                low=0,
+                high=EPISODE_HOURS - 1,
+                shape=(1,),
+                dtype=np.int32
+            ),
         })
Based on learnings: State space representation should include node counts, job queue status, electricity prices, and time information.
🤖 Prompt for AI Agents
In `@src/environment.py` around lines 161 - 172, Observation lacks explicit time
info: add a time feature to the observation_space dict (e.g., 'time_of_day' or
'episode_hour') alongside existing keys like 'backlog_size' and
'backlog_assigned', and ensure the environment updates this feature on every
reset() and step() (or inside the existing _get_observation/_observe method) so
the returned observation contains the current time-of-day or hour-of-episode
value; update observation_space dtype/shape appropriately (e.g., Box(low=0,
high=23, shape=(1,), dtype=np.int32) or normalized float) and set the value
during reset() initialization and each step() before returning observations.

Comment on lines 258 to 299
def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, backlog_queue,
is_baseline=False):
"""
Assign jobs, refilling the queue from backlog when it becomes empty.
Loop until no progress is made or resources/backlog are exhausted.
"""
total_launched_jobs = 0
total_dropped_jobs = 0
backlog_assigned = 0
backlog_loaded_remaining = 0

while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog

queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))

moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog

if num_launched_jobs == 0 and moved_from_backlog == 0:
break

return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backlog refill can violate the 1000‑job queue cap.

With backlog enabled, overflow jobs can accumulate beyond the fixed queue size, which risks exceeding the 1000‑job limit (and potentially the per‑hour and max‑duration constraints) unless those limits are enforced elsewhere. Please cap combined queue+backlog size and validate new job counts/durations before enqueueing or refilling.
As per coding guidelines: Job queue constraints must enforce maximum 1000 jobs, maximum 1500 new jobs per hour, and maximum 170h runtime per job.

🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 258 - 296, The backlog refill path can
exceed the 1000-job queue cap and skip per-hour/new-job and max-duration checks;
update assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog)
to enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.

Agent now has two extra integer variables in observation space to know of the existence and the length of the backlog_queue.

Flagged assign_jobs so it only ages waiting jobs once per step
@enlorenz enlorenz marked this pull request as ready for review February 2, 2026 08:46
@rbx rbx mentioned this pull request Feb 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant