From 4c767f075a891a1c04d9e3ace170b1e943c3030c Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 16:09:32 -0500 Subject: [PATCH 1/9] Rename some methods and move some to the base class in preparation for some reuse by ParallelTrialScheduler --- .../mlos_bench/schedulers/base_scheduler.py | 168 +++++++++++++----- .../mlos_bench/schedulers/sync_scheduler.py | 18 +- .../mlos_bench/schedulers/trial_runner.py | 12 +- 3 files changed, 137 insertions(+), 61 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index eaa5527c6d6..d64f4e812d4 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -240,7 +240,6 @@ def __exit__( self._in_context = False return False # Do not suppress exceptions - @abstractmethod def start(self) -> None: """Start the scheduling loop.""" assert self.experiment is not None @@ -255,13 +254,55 @@ def start(self) -> None: if self._config_id > 0: tunables = self.load_tunable_config(self._config_id) - self.schedule_trial(tunables) + # If a config_id is provided, assume it is expected to be run immediately. + self.add_trial_to_queue(tunables, ts_start=datetime.now(UTC)) + + is_warm_up: bool = self.optimizer.supports_preload + if not is_warm_up: + _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) + + not_done: bool = True + while not_done: + _LOG.info( + "Optimization loop: Longest finished trial sequence ID: %d", + self._longest_finished_trial_sequence_id, + ) + self.run_schedule(is_warm_up) + self.wait_for_trial_runners() + not_done = self.add_new_optimizer_suggestions() + self.assign_trial_runners( + self.experiment.pending_trials( + datetime.now(UTC), + running=False, + trial_runner_assigned=False, + ) + ) + is_warm_up = False + self.wait_for_trial_runners(wait_all=True) + + @abstractmethod + def wait_for_trial_runners(self, wait_all: bool = False) -> None: + """ + Wait for (enough) TrialRunners to finish. + + This is a blocking call that waits for enough of the the TrialRunners to finish. + The base class implementation waits for all of the TrialRunners to finish. + However this can be overridden in subclasses to implement a more asynchronous behavior. + + Parameters + ---------- + wait_all : bool + If True, wait for all TrialRunners to finish. + If False, wait for "enough" TrialRunners to finish (which for the + base class is all of them). + """ def teardown(self) -> None: """ Tear down the TrialRunners/Environment(s). - Call it after the completion of the `.start()` in the scheduler context. + Call it after the completion of the :py:meth:`Scheduler.start` in the + Scheduler context. """ assert self.experiment is not None if self._do_teardown: @@ -287,13 +328,20 @@ def load_tunable_config(self, config_id: int) -> TunableGroups: _LOG.debug("Config %d ::\n%s", config_id, json.dumps(tunable_values, indent=2)) return tunables.copy() - def _schedule_new_optimizer_suggestions(self) -> bool: + def add_new_optimizer_suggestions(self) -> bool: """ Optimizer part of the loop. - Load the results of the executed trials into the optimizer, suggest new - configurations, and add them to the queue. Return True if optimization is not - over, False otherwise. + Load the results of the executed trials into the + :py:class:`~.Optimizer`, suggest new configurations, and add them to the + queue. + + Returns + ------- + bool + The return value indicates whether the optimization process should + continue to get suggestions from the Optimizer or not. + See Also: :py:meth:`~.Scheduler.not_done`. """ assert self.experiment is not None (trial_ids, configs, scores, status) = self.experiment.load(self._last_trial_id) @@ -301,40 +349,67 @@ def _schedule_new_optimizer_suggestions(self) -> bool: self.optimizer.bulk_register(configs, scores, status) self._last_trial_id = max(trial_ids, default=self._last_trial_id) + # Check if the optimizer has converged or not. not_done = self.not_done() if not_done: tunables = self.optimizer.suggest() - self.schedule_trial(tunables) - + self.add_trial_to_queue(tunables) return not_done - def schedule_trial(self, tunables: TunableGroups) -> None: - """Add a configuration to the queue of trials.""" - # TODO: Alternative scheduling policies may prefer to expand repeats over - # time as well as space, or adjust the number of repeats (budget) of a given - # trial based on whether initial results are promising. + def add_trial_to_queue( + self, + tunables: TunableGroups, + ts_start: datetime | None = None, + ) -> None: + """ + Add a configuration to the queue of trials 1 or more times. + + (e.g., according to the :py:attr:`~.Scheduler.trial_config_repeat_count`) + + Parameters + ---------- + tunables : TunableGroups + The tunable configuration to add to the queue. + + ts_start : datetime | None + Optional timestamp to use to start the trial. + + Notes + ----- + Alternative scheduling policies may prefer to expand repeats over + time as well as space, or adjust the number of repeats (budget) of a given + trial based on whether initial results are promising. + """ for repeat_i in range(1, self._trial_config_repeat_count + 1): self._add_trial_to_queue( tunables, - config={ - # Add some additional metadata to track for the trial such as the - # optimizer config used. - # Note: these values are unfortunately mutable at the moment. - # Consider them as hints of what the config was the trial *started*. - # It is possible that the experiment configs were changed - # between resuming the experiment (since that is not currently - # prevented). - "optimizer": self.optimizer.name, - "repeat_i": repeat_i, - "is_defaults": tunables.is_defaults(), - **{ - f"opt_{key}_{i}": val - for (i, opt_target) in enumerate(self.optimizer.targets.items()) - for (key, val) in zip(["target", "direction"], opt_target) - }, - }, + ts_start=ts_start, + config=self._augment_trial_config_metadata(tunables, repeat_i), ) + def _augment_trial_config_metadata( + self, + tunables: TunableGroups, + repeat_i: int, + ) -> dict[str, Any]: + return { + # Add some additional metadata to track for the trial such as the + # optimizer config used. + # Note: these values are unfortunately mutable at the moment. + # Consider them as hints of what the config was the trial *started*. + # It is possible that the experiment configs were changed + # between resuming the experiment (since that is not currently + # prevented). + "optimizer": self.optimizer.name, + "repeat_i": repeat_i, + "is_defaults": tunables.is_defaults(), + **{ + f"opt_{key}_{i}": val + for (i, opt_target) in enumerate(self.optimizer.targets.items()) + for (key, val) in zip(["target", "direction"], opt_target) + }, + } + def _add_trial_to_queue( self, tunables: TunableGroups, @@ -352,10 +427,10 @@ def _add_trial_to_queue( def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: """ - Assigns TrialRunners to the given Trial in batch. + Assigns :py:class:`~.TrialRunner`s to the given :py:class:`~.Trial`s in batch. - The base class implements a simple round-robin scheduling algorithm for each - Trial in sequence. + The base class implements a simple round-robin scheduling algorithm for + each Trial in sequence. Subclasses can override this method to implement a more sophisticated policy. For instance:: @@ -375,6 +450,11 @@ def assign_trial_runners( trial.set_trial_runner(trial_runner) ... + Notes + ----- + Subclasses are *not* required to assign a TrialRunner to the Trial + (e.g., if the Trial should be deferred to a later time). + Parameters ---------- trials : Iterable[Storage.Trial] @@ -411,7 +491,8 @@ def assign_trial_runners( def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: """ - Gets the TrialRunner associated with the given Trial. + Gets the :py:class:`~.TrialRunner` associated with the given + :py:class:`~.Storage.Trial`. Parameters ---------- @@ -434,25 +515,30 @@ def get_trial_runner(self, trial: Storage.Trial) -> TrialRunner: assert trial_runner.trial_runner_id == trial.trial_runner_id return trial_runner - def _run_schedule(self, running: bool = False) -> None: + def run_schedule(self, running: bool = False) -> None: """ - Scheduler part of the loop. + Runs the current schedule of trials. - Check for pending trials in the queue and run them. + Check for :py:class:`.Trial`s with `:py:attr:`.Status.PENDING` and an + assigned :py:attr:`~.Trial.trial_runner_id` in the queue and run them + with :py:meth:`~.Scheduler.run_trial`. """ assert self.experiment is not None - # Make sure that any pending trials have a TrialRunner assigned. pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running)) - self.assign_trial_runners(pending_trials) for trial in pending_trials: + if trial.trial_runner_id is None: + logging.warning("Trial %s has no TrialRunner assigned yet.") + continue self.run_trial(trial) def not_done(self) -> bool: """ Check the stopping conditions. - By default, stop when the optimizer converges or max limit of trials reached. + By default, stop when the :py:class:`.Optimizer` converges or the limit + of :py:attr:`~.Scheduler.max_trials` is reached. """ + # TODO: Add more stopping conditions: https://github.com/microsoft/MLOS/issues/427 return self.optimizer.not_converged() and ( self._trial_count < self._max_trials or self._max_trials <= 0 ) diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index 4b864942dce..36ba2973d55 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -15,24 +15,10 @@ class SyncScheduler(Scheduler): """A simple single-threaded synchronous optimization loop implementation.""" - def start(self) -> None: - """Start the optimization loop.""" - super().start() - - is_warm_up = self.optimizer.supports_preload - if not is_warm_up: - _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) - - not_done = True - while not_done: - _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) - self._run_schedule(is_warm_up) - not_done = self._schedule_new_optimizer_suggestions() - is_warm_up = False - def run_trial(self, trial: Storage.Trial) -> None: """ - Set up and run a single trial. + Set up and run a single :py:class:`~.Storage.Trial` on its + :py:class:`~.TrialRunner`. Save the results in the storage. """ diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 80eb696bc6d..63c15a0e1f9 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -20,6 +20,7 @@ from mlos_bench.services.types import SupportsConfigLoading from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups +from mlos_bench.tunables.tunable_types import TunableValue _LOG = logging.getLogger(__name__) @@ -168,7 +169,7 @@ def run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ) -> None: + ) -> tuple[Status, datetime, dict[str, TunableValue] | None]: """ Run a single trial on this TrialRunner's Environment and stores the results in the backend Trial Storage. @@ -198,9 +199,10 @@ def run_trial( if not self.environment.setup(trial.tunables, trial.config(global_config)): _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) # FIXME: Use the actual timestamp from the environment. - _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED) - trial.update(Status.FAILED, datetime.now(UTC)) - return + (status, timestamp, results) = (Status.FAILED, datetime.now(UTC), None) + _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, status) + trial.update(status, timestamp) + return (status, timestamp, results) # TODO: start background status polling of the environments in the event loop. @@ -221,6 +223,8 @@ def run_trial( self._is_running = False + return (status, timestamp, results) + def teardown(self) -> None: """ Tear down the Environment. From 0261125e10168c38916409c43a17ba021113a1ea Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 16:21:35 -0500 Subject: [PATCH 2/9] remove wait_for_trial_runners for now --- .../mlos_bench/schedulers/base_scheduler.py | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index d64f4e812d4..5bbbedfbcac 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -263,12 +263,8 @@ def start(self) -> None: not_done: bool = True while not_done: - _LOG.info( - "Optimization loop: Longest finished trial sequence ID: %d", - self._longest_finished_trial_sequence_id, - ) + _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) self.run_schedule(is_warm_up) - self.wait_for_trial_runners() not_done = self.add_new_optimizer_suggestions() self.assign_trial_runners( self.experiment.pending_trials( @@ -278,24 +274,6 @@ def start(self) -> None: ) ) is_warm_up = False - self.wait_for_trial_runners(wait_all=True) - - @abstractmethod - def wait_for_trial_runners(self, wait_all: bool = False) -> None: - """ - Wait for (enough) TrialRunners to finish. - - This is a blocking call that waits for enough of the the TrialRunners to finish. - The base class implementation waits for all of the TrialRunners to finish. - However this can be overridden in subclasses to implement a more asynchronous behavior. - - Parameters - ---------- - wait_all : bool - If True, wait for all TrialRunners to finish. - If False, wait for "enough" TrialRunners to finish (which for the - base class is all of them). - """ def teardown(self) -> None: """ From 5db8fb2a5c3f043152a9410017718a3f4ea1b5ef Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 16:21:52 -0500 Subject: [PATCH 3/9] remove vscode setting --- .vscode/settings.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 23e43fed683..9be9b509175 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -146,8 +146,7 @@ "isort.check": true, "[python]": { "editor.codeActionsOnSave": { - "source.organizeImports": "explicit", - "source.unusedImports": "explicit" + "source.organizeImports": "explicit" }, "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true, From 540cbcef63a9e769c79d6d366f9eb35001e826cd Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 16:23:03 -0500 Subject: [PATCH 4/9] comments --- mlos_bench/mlos_bench/storage/base_storage.py | 17 +++++++-- .../mlos_bench/storage/sql/experiment.py | 37 ++++++++++++------- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index f2d393994f7..460a8ce488f 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -313,18 +313,27 @@ def pending_trials( timestamp: datetime, *, running: bool, + trial_runner_assigned: bool | None = None, ) -> Iterator["Storage.Trial"]: """ - Return an iterator over the pending trials that are scheduled to run on or - before the specified timestamp. + Return an iterator over the :py:class:`~.Storage.Trial`s that are + :py:attr:`~.Status.PENDING` and have a scheduled + :py:attr:`~.Storage.Trial.ts_start` time to run on or before the specified + timestamp. Parameters ---------- timestamp : datetime.datetime - The time in UTC to check for scheduled trials. + The time in UTC to check for scheduled Trials. running : bool - If True, include the trials that are already running. + If True, include the Trials that are also + :py:attr:`~.Status.RUNNING` or :py:attr:`~.Status.READY`. Otherwise, return only the scheduled trials. + trial_runner_assigned : bool | None + If True, include the Trials that are assigned to a + :py:class:`~.TrialRunner`. If False, return only the trials + that are not assigned to any :py:class:`~.TrialRunner`. + If None, return all trials regardless of their assignment. Returns ------- diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb47de7d714..30ce3a11a3c 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -235,25 +235,36 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access ) - def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: + # TODO: Add tests for trial_runner_assigned filtering. + def pending_trials( + self, + timestamp: datetime, + *, + running: bool = False, + trial_runner_assigned: bool | None = None, + ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) if running: - pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name] + statuses = [Status.PENDING, Status.READY, Status.RUNNING] else: - pending_status = [Status.PENDING.name] + statuses = [Status.PENDING] with self._engine.connect() as conn: - cur_trials = conn.execute( - self._schema.trial.select().where( - self._schema.trial.c.exp_id == self._experiment_id, - ( - self._schema.trial.c.ts_start.is_(None) - | (self._schema.trial.c.ts_start <= timestamp) - ), - self._schema.trial.c.ts_end.is_(None), - self._schema.trial.c.status.in_(pending_status), - ) + stmt = self._schema.trial.select().where( + self._schema.trial.c.exp_id == self._experiment_id, + ( + self._schema.trial.c.ts_start.is_(None) + | (self._schema.trial.c.ts_start <= timestamp) + ), + self._schema.trial.c.ts_end.is_(None), + self._schema.trial.c.status.in_([s.name for s in statuses]), ) + if trial_runner_assigned: + stmt.where(self._schema.trial.c.trial_runner_id.isnot(None)) + elif trial_runner_assigned is False: + stmt.where(self._schema.trial.c.trial_runner_id.is_(None)) + # else: # No filtering by trial_runner_id + cur_trials = conn.execute(stmt) for trial in cur_trials.fetchall(): tunables = self._get_key_val( conn, From a10529a058a7a729cf458c208553dc986769918a Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 16:31:22 -0500 Subject: [PATCH 5/9] doc improvements --- .../mlos_bench/schedulers/base_scheduler.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index 5bbbedfbcac..c3706e2d100 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -500,9 +500,24 @@ def run_schedule(self, running: bool = False) -> None: Check for :py:class:`.Trial`s with `:py:attr:`.Status.PENDING` and an assigned :py:attr:`~.Trial.trial_runner_id` in the queue and run them with :py:meth:`~.Scheduler.run_trial`. + + Subclasses can override this method to implement a more sophisticated + scheduling policy. + + Parameters + ---------- + running : bool + If True, run the trials that are already in a "running" state (e.g., to resume them). + If False (default), run the trials that are pending. """ assert self.experiment is not None - pending_trials = list(self.experiment.pending_trials(datetime.now(UTC), running=running)) + pending_trials = list( + self.experiment.pending_trials( + datetime.now(UTC), + running=running, + trial_runner_assigned=True, + ) + ) for trial in pending_trials: if trial.trial_runner_id is None: logging.warning("Trial %s has no TrialRunner assigned yet.") From 263452ae83b15fc2e12f73ec538cd00fd18b44fa Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 17:23:41 -0500 Subject: [PATCH 6/9] doc fixups --- mlos_bench/mlos_bench/schedulers/base_scheduler.py | 11 ++++++----- mlos_bench/mlos_bench/storage/base_storage.py | 7 +++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index c3706e2d100..e872be4ff18 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -349,7 +349,7 @@ def add_trial_to_queue( tunables : TunableGroups The tunable configuration to add to the queue. - ts_start : datetime | None + ts_start : datetime.datetime | None Optional timestamp to use to start the trial. Notes @@ -405,7 +405,8 @@ def _add_trial_to_queue( def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: """ - Assigns :py:class:`~.TrialRunner`s to the given :py:class:`~.Trial`s in batch. + Assigns a :py:class:`~.TrialRunner` to each :py:class:`~.Storage.Trial` in + the batch. The base class implements a simple round-robin scheduling algorithm for each Trial in sequence. @@ -497,9 +498,9 @@ def run_schedule(self, running: bool = False) -> None: """ Runs the current schedule of trials. - Check for :py:class:`.Trial`s with `:py:attr:`.Status.PENDING` and an - assigned :py:attr:`~.Trial.trial_runner_id` in the queue and run them - with :py:meth:`~.Scheduler.run_trial`. + Check for :py:class:`~.Storage.Trial` instances with `:py:attr:`.Status.PENDING` + and an assigned :py:attr:`~.Storage.Trial.trial_runner_id` in the queue and run + them with :py:meth:`~.Scheduler.run_trial`. Subclasses can override this method to implement a more sophisticated scheduling policy. diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index 460a8ce488f..c64ab5457c5 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -316,10 +316,9 @@ def pending_trials( trial_runner_assigned: bool | None = None, ) -> Iterator["Storage.Trial"]: """ - Return an iterator over the :py:class:`~.Storage.Trial`s that are - :py:attr:`~.Status.PENDING` and have a scheduled - :py:attr:`~.Storage.Trial.ts_start` time to run on or before the specified - timestamp. + Return an iterator over :py:attr:`~.Status.PENDING` + :py:class:`~.Storage.Trial` instances that have a scheduled start + time to run on or before the specified timestamp. Parameters ---------- From 9398b1817e6582c0c949d57e1b671413658a472c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 May 2025 22:24:04 +0000 Subject: [PATCH 7/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mlos_bench/mlos_bench/schedulers/base_scheduler.py | 4 ++-- mlos_bench/mlos_bench/storage/base_storage.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index e872be4ff18..58b36753970 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -405,8 +405,8 @@ def _add_trial_to_queue( def assign_trial_runners(self, trials: Iterable[Storage.Trial]) -> None: """ - Assigns a :py:class:`~.TrialRunner` to each :py:class:`~.Storage.Trial` in - the batch. + Assigns a :py:class:`~.TrialRunner` to each :py:class:`~.Storage.Trial` in the + batch. The base class implements a simple round-robin scheduling algorithm for each Trial in sequence. diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index c64ab5457c5..52ecf93b7d6 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -317,8 +317,8 @@ def pending_trials( ) -> Iterator["Storage.Trial"]: """ Return an iterator over :py:attr:`~.Status.PENDING` - :py:class:`~.Storage.Trial` instances that have a scheduled start - time to run on or before the specified timestamp. + :py:class:`~.Storage.Trial` instances that have a scheduled start time to + run on or before the specified timestamp. Parameters ---------- From 281dbf465f71623a4101ea9f49f6660120181c51 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Fri, 9 May 2025 17:53:10 -0500 Subject: [PATCH 8/9] add tests and fixes --- .../mlos_bench/storage/sql/experiment.py | 5 +- .../tests/storage/trial_schedule_test.py | 47 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 30ce3a11a3c..2d7db8e34ef 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -235,7 +235,6 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access ) - # TODO: Add tests for trial_runner_assigned filtering. def pending_trials( self, timestamp: datetime, @@ -260,9 +259,9 @@ def pending_trials( self._schema.trial.c.status.in_([s.name for s in statuses]), ) if trial_runner_assigned: - stmt.where(self._schema.trial.c.trial_runner_id.isnot(None)) + stmt = stmt.where(self._schema.trial.c.trial_runner_id.isnot(None)) elif trial_runner_assigned is False: - stmt.where(self._schema.trial.c.trial_runner_id.is_(None)) + stmt = stmt.where(self._schema.trial.c.trial_runner_id.is_(None)) # else: # No filtering by trial_runner_id cur_trials = conn.execute(stmt) for trial in cur_trials.fetchall(): diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index aaf545c787f..faefe4998bc 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -61,6 +61,53 @@ def test_schedule_trial( trial_now2_data = exp_data.trials[trial_now2.trial_id] assert trial_now2_data.trial_runner_id == trial_now2.trial_runner_id + # --- Test the trial_runner_assigned parameter --- + # At this point: + # - trial_now1: no trial_runner assigned + # - trial_now2: trial_runner assigned + # - trial_1h, trial_2h: no trial_runner assigned + + # All pending trials (should include all 4) + all_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=None, + ) + ) + assert all_pending == { + trial_now1.trial_id, + trial_now2.trial_id, + trial_1h.trial_id, + trial_2h.trial_id, + }, f"Expected all pending trials, got {all_pending}" + + # Only those with a trial_runner assigned + assigned_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=True, + ) + ) + assert assigned_pending == { + trial_now2.trial_id + }, f"Expected only trials with a runner assigned, got {assigned_pending}" + + # Only those without a trial_runner assigned + unassigned_pending = _trial_ids( + exp_storage.pending_trials( + timestamp + timedelta_1hr * 3, + running=False, + trial_runner_assigned=False, + ) + ) + assert unassigned_pending == { + trial_now1.trial_id, + trial_1h.trial_id, + trial_2h.trial_id, + }, f"Expected only trials without a runner assigned, got {unassigned_pending}" + # Scheduler side: get trials ready to run at certain timestamps: # Pretend 1 minute has passed, get trials scheduled to run: From defeb2848689883336739a6ad0c8e4944f43e67e Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Sat, 10 May 2025 10:06:22 -0500 Subject: [PATCH 9/9] fixup suggested --- mlos_bench/mlos_bench/schedulers/base_scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index 58b36753970..e2403333bb4 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -520,9 +520,9 @@ def run_schedule(self, running: bool = False) -> None: ) ) for trial in pending_trials: - if trial.trial_runner_id is None: - logging.warning("Trial %s has no TrialRunner assigned yet.") - continue + assert ( + trial.trial_runner_id is not None + ), f"Trial {trial} has no TrialRunner assigned yet." self.run_trial(trial) def not_done(self) -> bool: