From 9a6ac7b526033149773d70cf6d46f741bb931b2c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 14:38:25 +0200 Subject: [PATCH 1/5] [Feature] Faster batching --- .../task_scheduler/interactive/dependency.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 9f2a69dd..56e30538 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -249,7 +249,7 @@ def _execute_tasks_with_dependencies( executor (TaskSchedulerBase): Executor to execute the tasks with after the dependencies are resolved. refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. """ - wait_lst: list = [] + future_dependency_lst: list = [] while True: try: task_dict = future_queue.get_nowait() @@ -258,10 +258,10 @@ def _execute_tasks_with_dependencies( if ( # shutdown the executor task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"] ): - while len(wait_lst) > 0: + while len(future_dependency_lst) > 0: # Check functions in the wait list and execute them if all future objects are now ready - wait_lst = _update_waiting_task( - wait_lst=wait_lst, + future_dependency_lst = _handle_future_dependencies( + future_dependency_lst=future_dependency_lst, executor_queue=executor_queue, refresh_rate=refresh_rate, ) @@ -283,8 +283,13 @@ def _execute_tasks_with_dependencies( task_dict["future"].set_result(False) else: task_dict["future"].set_result(True) + elif ( # handle batched function submitted to the executor + task_dict is not None and "fn" in task_dict and task_dict["fn"] == "batched" and "future" in task_dict + ): + future_dependency_lst.append(task_dict) + future_queue.task_done() elif ( # handle function submitted to the executor - task_dict is not None and "fn" in task_dict and "future" in task_dict + task_dict is not None and "fn" in task_dict and task_dict["fn"] != "batched" and "future" in task_dict ): future_lst, ready_flag = get_future_objects_from_input( args=task_dict["args"], kwargs=task_dict["kwargs"] @@ -301,12 +306,12 @@ def _execute_tasks_with_dependencies( executor_queue.put(task_dict) else: # Otherwise add the function to the wait list task_dict["future_lst"] = future_lst - wait_lst.append(task_dict) + future_dependency_lst.append(task_dict) future_queue.task_done() - elif len(wait_lst) > 0: + elif len(future_dependency_lst) > 0: # Check functions in the wait list and execute them if all future objects are now ready - wait_lst = _update_waiting_task( - wait_lst=wait_lst, + future_dependency_lst = _handle_future_dependencies( + future_dependency_lst=future_dependency_lst, executor_queue=executor_queue, refresh_rate=refresh_rate, ) @@ -315,14 +320,14 @@ def _execute_tasks_with_dependencies( sleep(refresh_rate) -def _update_waiting_task( - wait_lst: list[dict], executor_queue: queue.Queue, refresh_rate: float = 0.01 +def _handle_future_dependencies( + future_dependency_lst: list[dict], executor_queue: queue.Queue, refresh_rate: float = 0.01 ) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor Args: - wait_lst (list): List of waiting tasks + future_dependency_lst (list): List of waiting tasks executor_queue (Queue): Queue of the internal executor refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. @@ -330,7 +335,7 @@ def _update_waiting_task( list: list tasks which future inputs have not been completed """ wait_tmp_lst = [] - for task_wait_dict in wait_lst: + for task_wait_dict in future_dependency_lst: exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"]) if len(exception_lst) > 0 and task_wait_dict["fn"] != "batched": task_wait_dict["future"].set_exception(exception_lst[0]) @@ -360,6 +365,6 @@ def _update_waiting_task( task_wait_dict["future_skip"].set_result([id(f) for f in done_lst]) else: wait_tmp_lst.append(task_wait_dict) - if len(wait_lst) == len(wait_tmp_lst): + if len(future_dependency_lst) == len(wait_tmp_lst): sleep(refresh_rate) return wait_tmp_lst From f98f9c427fa10b0ed470e8cad32d86c9244b7e33 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Thu, 11 Jun 2026 12:39:41 +0000 Subject: [PATCH 2/5] Format black --- .../task_scheduler/interactive/dependency.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 56e30538..43136d81 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -284,12 +284,18 @@ def _execute_tasks_with_dependencies( else: task_dict["future"].set_result(True) elif ( # handle batched function submitted to the executor - task_dict is not None and "fn" in task_dict and task_dict["fn"] == "batched" and "future" in task_dict + task_dict is not None + and "fn" in task_dict + and task_dict["fn"] == "batched" + and "future" in task_dict ): future_dependency_lst.append(task_dict) future_queue.task_done() elif ( # handle function submitted to the executor - task_dict is not None and "fn" in task_dict and task_dict["fn"] != "batched" and "future" in task_dict + task_dict is not None + and "fn" in task_dict + and task_dict["fn"] != "batched" + and "future" in task_dict ): future_lst, ready_flag = get_future_objects_from_input( args=task_dict["args"], kwargs=task_dict["kwargs"] @@ -321,7 +327,9 @@ def _execute_tasks_with_dependencies( def _handle_future_dependencies( - future_dependency_lst: list[dict], executor_queue: queue.Queue, refresh_rate: float = 0.01 + future_dependency_lst: list[dict], + executor_queue: queue.Queue, + refresh_rate: float = 0.01, ) -> list: """ Submit the waiting tasks, which future inputs have been completed, to the executor From cb96b269e69a561b71d62db2560372694584a4a0 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 15:36:23 +0200 Subject: [PATCH 3/5] fixes --- src/executorlib/standalone/interactive/arguments.py | 8 ++++++-- src/executorlib/task_scheduler/interactive/dependency.py | 5 ++++- tests/unit/standalone/interactive/test_arguments.py | 7 +++++-- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/executorlib/standalone/interactive/arguments.py b/src/executorlib/standalone/interactive/arguments.py index f51f0b02..4c1107a0 100644 --- a/src/executorlib/standalone/interactive/arguments.py +++ b/src/executorlib/standalone/interactive/arguments.py @@ -27,10 +27,14 @@ def find_future_in_list(lst): find_future_in_list(lst=args) find_future_in_list(lst=kwargs.values()) - boolean_flag = len([future for future in future_lst if future.done()]) == len( + + return future_lst + + +def check_list_of_futures_is_done(future_lst: list[Future]) -> bool: + return len([future for future in future_lst if future.done()]) == len( future_lst ) - return future_lst, boolean_flag def get_exception_lst(future_lst: list[Future]) -> list: diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 43136d81..fb9c1f13 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -7,6 +7,7 @@ from executorlib.standalone.batched import batched_futures from executorlib.standalone.interactive.arguments import ( check_exception_was_raised, + check_list_of_futures_is_done, get_exception_lst, get_future_objects_from_input, update_futures_in_input, @@ -185,6 +186,7 @@ def batched( "args": (), "kwargs": {"lst": iterable, "n": n, "skip_lst": skip_lst}, "future": f, + "future_lst": iterable, "future_skip": f_skip, "resource_dict": {}, } @@ -297,9 +299,10 @@ def _execute_tasks_with_dependencies( and task_dict["fn"] != "batched" and "future" in task_dict ): - future_lst, ready_flag = get_future_objects_from_input( + future_lst = get_future_objects_from_input( args=task_dict["args"], kwargs=task_dict["kwargs"] ) + ready_flag = check_list_of_futures_is_done(future_lst=future_lst) exception_lst = get_exception_lst(future_lst=future_lst) if not check_exception_was_raised(future_obj=task_dict["future"]): if len(exception_lst) > 0: diff --git a/tests/unit/standalone/interactive/test_arguments.py b/tests/unit/standalone/interactive/test_arguments.py index 2e86e9eb..40076b2e 100644 --- a/tests/unit/standalone/interactive/test_arguments.py +++ b/tests/unit/standalone/interactive/test_arguments.py @@ -3,6 +3,7 @@ from executorlib.standalone.interactive.arguments import ( check_exception_was_raised, + check_list_of_futures_is_done, get_exception_lst, get_future_objects_from_input, update_futures_in_input, @@ -13,14 +14,16 @@ class TestSerial(unittest.TestCase): def test_get_future_objects_from_input_with_future(self): input_args = (1, 2, Future(), [Future()], {3: Future()}) input_kwargs = {"a": 1, "b": [Future()], "c": {"d": Future()}, "e": Future()} - future_lst, boolean_flag = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + future_lst = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + boolean_flag = check_list_of_futures_is_done(future_lst=future_lst) self.assertEqual(len(future_lst), 6) self.assertFalse(boolean_flag) def test_get_future_objects_from_input_without_future(self): input_args = (1, 2) input_kwargs = {"a": 1} - future_lst, boolean_flag = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + future_lst = get_future_objects_from_input(args=input_args, kwargs=input_kwargs) + boolean_flag = check_list_of_futures_is_done(future_lst=future_lst) self.assertEqual(len(future_lst), 0) self.assertTrue(boolean_flag) From 85ea7195d5161aa70c00bcb1a92b4bc6777f0254 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Jun 2026 13:36:43 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/standalone/interactive/arguments.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/executorlib/standalone/interactive/arguments.py b/src/executorlib/standalone/interactive/arguments.py index 4c1107a0..423242d8 100644 --- a/src/executorlib/standalone/interactive/arguments.py +++ b/src/executorlib/standalone/interactive/arguments.py @@ -32,9 +32,7 @@ def find_future_in_list(lst): def check_list_of_futures_is_done(future_lst: list[Future]) -> bool: - return len([future for future in future_lst if future.done()]) == len( - future_lst - ) + return len([future for future in future_lst if future.done()]) == len(future_lst) def get_exception_lst(future_lst: list[Future]) -> list: From 4bb3015d8ffc6d31ce84306ecbf6f7891481389d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 15:54:54 +0200 Subject: [PATCH 5/5] Add docstring --- src/executorlib/standalone/interactive/arguments.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/executorlib/standalone/interactive/arguments.py b/src/executorlib/standalone/interactive/arguments.py index 423242d8..7f0f7d89 100644 --- a/src/executorlib/standalone/interactive/arguments.py +++ b/src/executorlib/standalone/interactive/arguments.py @@ -32,6 +32,15 @@ def find_future_in_list(lst): def check_list_of_futures_is_done(future_lst: list[Future]) -> bool: + """ + Check if all future objects in the list of future objects are done + + Args: + future_lst (list): list of future objects + + Returns: + bool: True if all future objects in the list of future objects are done, False otherwise + """ return len([future for future in future_lst if future.done()]) == len(future_lst)