From b173e16a95095ad21c316bc71f0d57135868b6a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 19 Jun 2026 06:29:12 +0200 Subject: [PATCH 1/3] [Fix] propagate init_function errors from all MPI ranks in block allocation mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In interactive_parallel.py the init branch was only propagating errors from rank 0; failures on non-zero ranks were silently swallowed, leaving those ranks with uninitialised memory. Subsequent function calls on the affected ranks would then receive wrong or missing kwargs. The fix mirrors the existing function-execution path: after each rank runs call_funct for init, all errors are gathered to rank 0 via MPI.COMM_WORLD.gather before the success/error response is sent back to the scheduler. This also acts as an implicit barrier so the scheduler cannot dispatch the next task until every rank has finished init. Adds a test (test_internal_memory_mpi) that exercises block allocation with cores=2 and an init_function – a combination that had zero coverage. Co-Authored-By: Claude Sonnet 4.6 --- .../backend/interactive_parallel.py | 17 ++++++++++++----- .../standalone/interactive/test_spawner.py | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/executorlib/backend/interactive_parallel.py b/src/executorlib/backend/interactive_parallel.py index 7f968391d..5aef4e60b 100644 --- a/src/executorlib/backend/interactive_parallel.py +++ b/src/executorlib/backend/interactive_parallel.py @@ -97,22 +97,29 @@ def main() -> None: and "args" in input_dict and "kwargs" in input_dict ): + init_error = None try: memory.update( call_funct(input_dict=input_dict, funct=None, memory=memory) ) except Exception as error: - if mpi_rank_zero: + init_error = error + if mpi_size_larger_one: + all_errors = MPI.COMM_WORLD.gather(init_error, root=0) + else: + all_errors = [init_error] + if mpi_rank_zero: + first_error = next((e for e in all_errors if e is not None), None) + if first_error is not None: interface_send( socket=socket, - result_dict={"error": error}, + result_dict={"error": first_error}, ) backend_write_error_file( - error=error, + error=first_error, apply_dict=input_dict, ) - else: - if mpi_rank_zero: + else: interface_send(socket=socket, result_dict={"result": True}) diff --git a/tests/unit/standalone/interactive/test_spawner.py b/tests/unit/standalone/interactive/test_spawner.py index 1af872cdc..bd76c35e0 100644 --- a/tests/unit/standalone/interactive/test_spawner.py +++ b/tests/unit/standalone/interactive/test_spawner.py @@ -271,6 +271,25 @@ def test_execute_task(self): self.assertEqual(f.result(), np.array([5])) q.join() + @unittest.skipIf( + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." + ) + def test_internal_memory_mpi(self): + with BlockAllocationTaskScheduler( + max_workers=1, + executor_kwargs={ + "cores": 2, + "init_function": set_global, + }, + spawner=MpiExecSpawner, + ) as p: + cloudpickle_register(ind=1) + f = p.submit(get_global) + result = f.result() + self.assertEqual(len(result), 2) + np.testing.assert_array_equal(result[0], np.array([5])) + np.testing.assert_array_equal(result[1], np.array([5])) + class TestBlockAllocationTaskScheduler(unittest.TestCase): def test_submit_tracks_future_state(self): From efe81db528ac806890ee3130016b964e9fd36fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 19 Jun 2026 07:01:59 +0200 Subject: [PATCH 2/3] [Fix] extract _execute_init_dict to satisfy PLR0915 (too many statements) Moves the init-function handling out of main() into a private helper so the statement count stays within the ruff/pylint PLR0915 limit of 50. Co-Authored-By: Claude Sonnet 4.6 --- .../backend/interactive_parallel.py | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/executorlib/backend/interactive_parallel.py b/src/executorlib/backend/interactive_parallel.py index 5aef4e60b..13feb4080 100644 --- a/src/executorlib/backend/interactive_parallel.py +++ b/src/executorlib/backend/interactive_parallel.py @@ -16,6 +16,33 @@ ) +def _execute_init_dict( + input_dict: dict, + memory: dict, + socket: Optional[zmq.Socket], + mpi_rank_zero: bool, + mpi_size_larger_one: bool, +) -> None: + from mpi4py import MPI + + init_error = None + try: + memory.update(call_funct(input_dict=input_dict, funct=None, memory=memory)) + except Exception as error: + init_error = error + if mpi_size_larger_one: + all_errors = MPI.COMM_WORLD.gather(init_error, root=0) + else: + all_errors = [init_error] + if mpi_rank_zero: + first_error = next((e for e in all_errors if e is not None), None) + if first_error is not None: + interface_send(socket=socket, result_dict={"error": first_error}) + backend_write_error_file(error=first_error, apply_dict=input_dict) + else: + interface_send(socket=socket, result_dict={"result": True}) + + def main() -> None: """ Entry point of the program. @@ -97,30 +124,13 @@ def main() -> None: and "args" in input_dict and "kwargs" in input_dict ): - init_error = None - try: - memory.update( - call_funct(input_dict=input_dict, funct=None, memory=memory) - ) - except Exception as error: - init_error = error - if mpi_size_larger_one: - all_errors = MPI.COMM_WORLD.gather(init_error, root=0) - else: - all_errors = [init_error] - if mpi_rank_zero: - first_error = next((e for e in all_errors if e is not None), None) - if first_error is not None: - interface_send( - socket=socket, - result_dict={"error": first_error}, - ) - backend_write_error_file( - error=first_error, - apply_dict=input_dict, - ) - else: - interface_send(socket=socket, result_dict={"result": True}) + _execute_init_dict( + input_dict=input_dict, + memory=memory, + socket=socket, + mpi_rank_zero=mpi_rank_zero, + mpi_size_larger_one=mpi_size_larger_one, + ) if __name__ == "__main__": From 7f562407c83424888a9bf0fac4841ffe7a2dacbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 19 Jun 2026 07:04:28 +0200 Subject: [PATCH 3/3] [Docs] add docstring to _execute_init_dict Co-Authored-By: Claude Sonnet 4.6 --- src/executorlib/backend/interactive_parallel.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/executorlib/backend/interactive_parallel.py b/src/executorlib/backend/interactive_parallel.py index 13feb4080..ff752834a 100644 --- a/src/executorlib/backend/interactive_parallel.py +++ b/src/executorlib/backend/interactive_parallel.py @@ -23,6 +23,23 @@ def _execute_init_dict( mpi_rank_zero: bool, mpi_size_larger_one: bool, ) -> None: + """ + Execute an init-function message and update the in-process memory store. + + Runs the callable in input_dict on every MPI rank, then gathers errors + from all ranks to rank 0 so that a failure on any non-zero rank is not + silently swallowed. Rank 0 sends the result or the first observed error + back to the scheduler via the ZMQ socket. + + Args: + input_dict (dict): Message dict with keys "init", "fn", "args", "kwargs". + memory (dict): Per-rank memory store; updated in-place with the return + value of the init function on success. + socket (zmq.Socket | None): ZMQ socket used by rank 0 to reply to the + scheduler; None on non-zero ranks. + mpi_rank_zero (bool): True only on MPI rank 0. + mpi_size_larger_one (bool): True when the communicator has more than one rank. + """ from mpi4py import MPI init_error = None