From 9fd1c793c3aaaf346a7e9f78fd59e8fa867508db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Tue, 9 Jun 2026 21:15:42 +0200 Subject: [PATCH 1/8] Fixed batched_futures() --- src/executorlib/standalone/batched.py | 41 +++++++++++++++---- .../task_scheduler/interactive/dependency.py | 26 +++++++++--- 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index addd4ab56..c4e690c01 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -1,5 +1,9 @@ from concurrent.futures import Future +# Future objects we have already reported as failed -- so each failed job is logged once, not on +# every scheduler pass (batched_futures is re-evaluated many times until a batch fills). +_logged_failed_ids: set = set() + def batched_futures(lst: list[Future], skip_lst: list[list], n: int) -> list[list]: """ @@ -7,6 +11,14 @@ def batched_futures(lst: list[Future], skip_lst: list[list], n: int) -> list[lis not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_lst then they are returned as batch. + Futures that completed with an EXCEPTION (e.g. a labeling job that failed on a degenerate config, or a dead worker) + are EXCLUDED from the batch rather than re-raised. Calling ``.result()`` on a failed future re-raises its exception; + in the dependency scheduler that turns into ``set_exception`` on this batch future, which then cascades to every + downstream task (combine_b / featurize / fit / cost / pareto) depending on the batch -- i.e. a single bad config + silently kills the whole pipeline. Each failed future is logged once. When the entire input is resolved but a full + batch of n cannot be formed (because some futures failed), the partial remainder is returned so the pipeline does + not stall forever waiting for a batch that can never fill. + Args: lst (list): list of all future objects skip_lst (list): list of previous batches of future objects @@ -15,13 +27,28 @@ def batched_futures(lst: list[Future], skip_lst: list[list], n: int) -> list[lis Returns: list: results of the batched futures """ - skipped_elements_lst = [item for items in skip_lst for item in items] + skipped_ids = {id(item) for items in skip_lst for item in items} - done_lst = [] - n_expected = min(n, len(lst) - len(skipped_elements_lst)) + done_lst: list = [] + all_resolved = True for v in lst: - if v.done() and v.result() not in skipped_elements_lst: - done_lst.append(v.result()) - if len(done_lst) == n_expected: - return done_lst + if v.done(): + if v.exception() is not None: + if id(v) not in _logged_failed_ids: + _logged_failed_ids.add(id(v)) + print( + f"[batched_futures] EXCLUDING failed future from batch: " + f"{type(v.exception()).__name__}: {v.exception()}", + flush=True, + ) + continue # failed future: exclude instead of re-raising (which would poison all dependents) + result = v.result() + if id(result) not in skipped_ids: + done_lst.append(result) + if len(done_lst) == n: + return done_lst + else: + all_resolved = False + if all_resolved and len(done_lst) > 0: + return done_lst # end of input reached; emit final (possibly short) batch return [] diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index fe24c3787..39f25750c 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -281,6 +281,15 @@ def _execute_tasks_with_dependencies( task_dict["future"].set_result(False) else: task_dict["future"].set_result(True) + elif ( # batched collector: readiness is its skip_lst + batched_futures (which scans `lst` + # once, when ready, preserving completion-order). Do NOT run get_future_objects_from_input + # on kwargs -- `lst` can be 100k+ futures, making ingestion (and every wait-list pass) O(N) + # per collector and stalling the scheduler. Track only the small skip_lst as future_lst. + task_dict is not None and task_dict.get("fn") == "batched" and "future" in task_dict + ): + task_dict["future_lst"] = task_dict["kwargs"]["skip_lst"] + wait_lst.append(task_dict) + future_queue.task_done() elif ( # handle function submitted to the executor task_dict is not None and "fn" in task_dict and "future" in task_dict ): @@ -343,11 +352,18 @@ def _update_waiting_task( elif task_wait_dict["fn"] == "batched" and all( future.done() for future in task_wait_dict["kwargs"]["skip_lst"] ): - done_lst = batched_futures( - lst=task_wait_dict["kwargs"]["lst"], - n=task_wait_dict["kwargs"]["n"], - skip_lst=[f.result() for f in task_wait_dict["kwargs"]["skip_lst"]], - ) + try: + done_lst = batched_futures( + lst=task_wait_dict["kwargs"]["lst"], + n=task_wait_dict["kwargs"]["n"], + skip_lst=[f.result() for f in task_wait_dict["kwargs"]["skip_lst"]], + ) + except Exception as exc: + # A future in `lst` (or skip_lst) raised. Propagate to the batch future instead of + # crashing the scheduler thread. (We no longer scan all of `lst` for exceptions via + # future_lst for performance, so batched_futures' .result() is where they surface.) + task_wait_dict["future"].set_exception(exc) + continue if len(done_lst) == 0: wait_tmp_lst.append(task_wait_dict) else: From 6976d48eb1e2004f6cb0e4c86a720be7a113bfd7 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Tue, 9 Jun 2026 19:16:45 +0000 Subject: [PATCH 2/8] Format black --- src/executorlib/task_scheduler/interactive/dependency.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 39f25750c..5ff8f23c1 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -285,7 +285,9 @@ def _execute_tasks_with_dependencies( # once, when ready, preserving completion-order). Do NOT run get_future_objects_from_input # on kwargs -- `lst` can be 100k+ futures, making ingestion (and every wait-list pass) O(N) # per collector and stalling the scheduler. Track only the small skip_lst as future_lst. - task_dict is not None and task_dict.get("fn") == "batched" and "future" in task_dict + task_dict is not None + and task_dict.get("fn") == "batched" + and "future" in task_dict ): task_dict["future_lst"] = task_dict["kwargs"]["skip_lst"] wait_lst.append(task_dict) From 67f2570c46c1d2fac0b76a7ea7f803c04dd1c8a0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Jun 2026 08:09:35 +0000 Subject: [PATCH 3/8] Plan: Add unit tests for newly added lines in batched.py and dependency.py --- src/executorlib/_version.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/executorlib/_version.py b/src/executorlib/_version.py index 0e58bba20..c34c5f09f 100644 --- a/src/executorlib/_version.py +++ b/src/executorlib/_version.py @@ -1,21 +1,24 @@ -# file generated by setuptools-scm +# file generated by vcs-versioning # don't change, don't track in version control +from __future__ import annotations -__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"] - -TYPE_CHECKING = False -if TYPE_CHECKING: - from typing import Tuple - from typing import Union - - VERSION_TUPLE = Tuple[Union[int, str], ...] -else: - VERSION_TUPLE = object +__all__ = [ + "__version__", + "__version_tuple__", + "version", + "version_tuple", + "__commit_id__", + "commit_id", +] version: str __version__: str -__version_tuple__: VERSION_TUPLE -version_tuple: VERSION_TUPLE +__version_tuple__: tuple[int | str, ...] +version_tuple: tuple[int | str, ...] +commit_id: str | None +__commit_id__: str | None + +__version__ = version = '0.1.dev2+g6976d48eb' +__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g6976d48eb') -__version__ = version = "0.0.1" -__version_tuple__ = version_tuple = (0, 0, 1) +__commit_id__ = commit_id = None From cad969f5f885c682801d36d210a13c0d7417842a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Jun 2026 10:11:12 +0200 Subject: [PATCH 4/8] Refactor versioning to use setuptools-scm Updated versioning information and cleaned up type hints. --- src/executorlib/_version.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/executorlib/_version.py b/src/executorlib/_version.py index c34c5f09f..0e58bba20 100644 --- a/src/executorlib/_version.py +++ b/src/executorlib/_version.py @@ -1,24 +1,21 @@ -# file generated by vcs-versioning +# file generated by setuptools-scm # don't change, don't track in version control -from __future__ import annotations -__all__ = [ - "__version__", - "__version_tuple__", - "version", - "version_tuple", - "__commit_id__", - "commit_id", -] +__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"] + +TYPE_CHECKING = False +if TYPE_CHECKING: + from typing import Tuple + from typing import Union + + VERSION_TUPLE = Tuple[Union[int, str], ...] +else: + VERSION_TUPLE = object version: str __version__: str -__version_tuple__: tuple[int | str, ...] -version_tuple: tuple[int | str, ...] -commit_id: str | None -__commit_id__: str | None - -__version__ = version = '0.1.dev2+g6976d48eb' -__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g6976d48eb') +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE -__commit_id__ = commit_id = None +__version__ = version = "0.0.1" +__version_tuple__ = version_tuple = (0, 0, 1) From 0a12ed7890d4d93cc3b7946c654f90f66595fd49 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Jun 2026 08:11:19 +0000 Subject: [PATCH 5/8] Add unit tests for new batched_futures behavior and dependency scheduler batched path --- .../unit/executor/test_single_dependencies.py | 17 +++++++ tests/unit/standalone/test_batched.py | 50 ++++++++++++++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/tests/unit/executor/test_single_dependencies.py b/tests/unit/executor/test_single_dependencies.py index d06ee0694..f861c65dc 100644 --- a/tests/unit/executor/test_single_dependencies.py +++ b/tests/unit/executor/test_single_dependencies.py @@ -82,6 +82,23 @@ def test_batched(self): self.assertEqual(len(result_lst), 4) self.assertTrue(t3-t2 > t2-t1) + def test_batched_with_failed_upstream_future(self): + """A failed future in lst must be excluded from batches; downstream must not see an exception.""" + # 5 successful futures (returning 0–4) + 1 failed = 6 total → 2 batch futures (n=3). + # Expected batches (in completion order): [0,1,2] and [3,4] (partial, all_resolved). + with SingleNodeExecutor() as exe: + cloudpickle_register(ind=1) + future_lst = [] + for i in range(5): + future_lst.append(exe.submit(return_input_dict, i)) + future_lst.append(exe.submit(raise_error, parameter=0)) + future_second_lst = exe.batched(future_lst, n=3) + result_lst = [f.result() for f in future_second_lst] + # All batch futures must succeed (no exception cascaded from the failed input) + self.assertEqual(len(result_lst), 2) + # The union of all batched results must be exactly {0, 1, 2, 3, 4} + self.assertEqual(set(item for batch in result_lst for item in batch), {0, 1, 2, 3, 4}) + def test_batched_error(self): with self.assertRaises(TypeError): with SingleNodeExecutor() as exe: diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index 9b811d26a..8730b6c83 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -1,9 +1,15 @@ from unittest import TestCase from concurrent.futures import Future -from executorlib.standalone.batched import batched_futures +from executorlib.standalone.batched import batched_futures, _logged_failed_ids class TestBatched(TestCase): + def setUp(self): + _logged_failed_ids.clear() + + def tearDown(self): + _logged_failed_ids.clear() + def test_batched_futures(self): lst = [] for i in list(range(10)): @@ -21,3 +27,45 @@ def test_batched_futures_not_finished(self): f = Future() lst.append(f) self.assertEqual(batched_futures(lst=lst, n=3, skip_lst=[]), []) + + def test_batched_futures_with_failed_future(self): + """Failed futures are excluded from the batch rather than raising.""" + lst = [] + for i in range(5): + f = Future() + f.set_result(i) + lst.append(f) + f_failed = Future() + f_failed.set_exception(RuntimeError("task failed")) + lst.insert(2, f_failed) # insert at position 2: [0, 1, FAILED, 2, 3, 4] + # The failed future must not propagate; first 3 successful results are returned + result = batched_futures(lst=lst, n=3, skip_lst=[]) + self.assertEqual(result, [0, 1, 2]) + # The failed future's id is recorded so it is only logged once + self.assertIn(id(f_failed), _logged_failed_ids) + + def test_batched_futures_failed_future_logged_once(self): + """A failed future is only logged once, even across multiple calls.""" + f_failed = Future() + f_failed.set_exception(RuntimeError("task failed")) + lst = [f_failed] + batched_futures(lst=lst, n=1, skip_lst=[]) + self.assertIn(id(f_failed), _logged_failed_ids) + size_after_first_call = len(_logged_failed_ids) + # Second call must not add the id again + batched_futures(lst=lst, n=1, skip_lst=[]) + self.assertEqual(len(_logged_failed_ids), size_after_first_call) + + def test_batched_futures_partial_batch_due_to_failures(self): + """Emit a partial batch when all futures are resolved but n is unreachable due to failures.""" + lst = [] + for i in range(2): + f = Future() + f.set_result(i) + lst.append(f) + f_failed = Future() + f_failed.set_exception(RuntimeError("task failed")) + lst.append(f_failed) + # all_resolved=True, only 2 successful results remain — must emit partial batch [0, 1] + result = batched_futures(lst=lst, n=3, skip_lst=[]) + self.assertEqual(result, [0, 1]) From 11c7bcb7a93793a8bb176a7ed8923c8eaf9c35f1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Jun 2026 08:43:56 +0000 Subject: [PATCH 6/8] Add test for exception handler in _update_waiting_task batched path --- src/executorlib/_version.py | 33 +++++++------- .../unit/executor/test_single_dependencies.py | 43 ++++++++++++++++++- 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/src/executorlib/_version.py b/src/executorlib/_version.py index 0e58bba20..d0b898db0 100644 --- a/src/executorlib/_version.py +++ b/src/executorlib/_version.py @@ -1,21 +1,24 @@ -# file generated by setuptools-scm +# file generated by vcs-versioning # don't change, don't track in version control +from __future__ import annotations -__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"] - -TYPE_CHECKING = False -if TYPE_CHECKING: - from typing import Tuple - from typing import Union - - VERSION_TUPLE = Tuple[Union[int, str], ...] -else: - VERSION_TUPLE = object +__all__ = [ + "__version__", + "__version_tuple__", + "version", + "version_tuple", + "__commit_id__", + "commit_id", +] version: str __version__: str -__version_tuple__: VERSION_TUPLE -version_tuple: VERSION_TUPLE +__version_tuple__: tuple[int | str, ...] +version_tuple: tuple[int | str, ...] +commit_id: str | None +__commit_id__: str | None + +__version__ = version = '0.1.dev2+g0a12ed789.d20260610' +__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g0a12ed789.d20260610') -__version__ = version = "0.0.1" -__version_tuple__ = version_tuple = (0, 0, 1) +__commit_id__ = commit_id = None diff --git a/tests/unit/executor/test_single_dependencies.py b/tests/unit/executor/test_single_dependencies.py index f861c65dc..58e708df3 100644 --- a/tests/unit/executor/test_single_dependencies.py +++ b/tests/unit/executor/test_single_dependencies.py @@ -3,10 +3,14 @@ from time import sleep, time from queue import Queue from threading import Thread +from unittest.mock import MagicMock from executorlib import SingleNodeExecutor from executorlib.executor.single import create_single_node_executor -from executorlib.task_scheduler.interactive.dependency import _execute_tasks_with_dependencies +from executorlib.task_scheduler.interactive.dependency import ( + _execute_tasks_with_dependencies, + _update_waiting_task, +) from executorlib.standalone.serialize import cloudpickle_register from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -300,6 +304,43 @@ def test_future_input_dict(self): ) self.assertEqual(fs.result()["a"], 4) + def test_update_waiting_task_batched_exception(self): + """_update_waiting_task catches exceptions from batched_futures and sets them on the batch future.""" + executor_queue = Queue() + batch_future = Future() + + # A mock skip_lst future: done(), exception() returns None (passes get_exception_lst), + # but result() raises -- triggering the except block in _update_waiting_task. + mock_skip_future = MagicMock() + mock_skip_future.done.return_value = True + mock_skip_future.exception.return_value = None + mock_skip_future.result.side_effect = RuntimeError("unexpected skip error") + + task_dict = { + "fn": "batched", + "args": (), + "kwargs": { + "lst": [], + "n": 3, + "skip_lst": [mock_skip_future], + }, + "future": batch_future, + "future_lst": [mock_skip_future], + "resource_dict": {}, + } + + result_lst = _update_waiting_task( + wait_lst=[task_dict], + executor_queue=executor_queue, + refresh_rate=0.0, + ) + + # The batch future must have the exception propagated (not crashed the scheduler) + self.assertTrue(batch_future.done()) + self.assertIsInstance(batch_future.exception(), RuntimeError) + # The failed task is consumed (not re-queued in the wait list) + self.assertEqual(len(result_lst), 0) + class TestExecutorErrors(unittest.TestCase): def test_block_allocation_false_one_worker(self): From 3177aff93f61126cd3dd0885193b085108a62d86 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Wed, 10 Jun 2026 08:46:04 +0000 Subject: [PATCH 7/8] Format black --- src/executorlib/_version.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/executorlib/_version.py b/src/executorlib/_version.py index d0b898db0..fde03ef36 100644 --- a/src/executorlib/_version.py +++ b/src/executorlib/_version.py @@ -18,7 +18,7 @@ commit_id: str | None __commit_id__: str | None -__version__ = version = '0.1.dev2+g0a12ed789.d20260610' -__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g0a12ed789.d20260610') +__version__ = version = "0.1.dev2+g0a12ed789.d20260610" +__version_tuple__ = version_tuple = (0, 1, "dev2", "g0a12ed789.d20260610") __commit_id__ = commit_id = None From 98cb58c7dbe6664affab6c32bf25286d1a25c1b4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Jun 2026 10:46:25 +0200 Subject: [PATCH 8/8] Update _version.py --- src/executorlib/_version.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/executorlib/_version.py b/src/executorlib/_version.py index fde03ef36..0e58bba20 100644 --- a/src/executorlib/_version.py +++ b/src/executorlib/_version.py @@ -1,24 +1,21 @@ -# file generated by vcs-versioning +# file generated by setuptools-scm # don't change, don't track in version control -from __future__ import annotations -__all__ = [ - "__version__", - "__version_tuple__", - "version", - "version_tuple", - "__commit_id__", - "commit_id", -] +__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"] + +TYPE_CHECKING = False +if TYPE_CHECKING: + from typing import Tuple + from typing import Union + + VERSION_TUPLE = Tuple[Union[int, str], ...] +else: + VERSION_TUPLE = object version: str __version__: str -__version_tuple__: tuple[int | str, ...] -version_tuple: tuple[int | str, ...] -commit_id: str | None -__commit_id__: str | None - -__version__ = version = "0.1.dev2+g0a12ed789.d20260610" -__version_tuple__ = version_tuple = (0, 1, "dev2", "g0a12ed789.d20260610") +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE -__commit_id__ = commit_id = None +__version__ = version = "0.0.1" +__version_tuple__ = version_tuple = (0, 0, 1)