From 71a7569323deca4d94650508497018729f86c4e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 12 Jun 2026 11:10:22 +0200 Subject: [PATCH] [Documentation] Consolidate workshop tutorials into the documentation Gather documentation from the executorlib workshop and tutorial repositories which was not yet covered in the main documentation: - docs/coupling.md: new page demonstrating executorlib as a drop-in executor / worker pool for emcee, pipefunc, omp4py and pylammpsmpi - README: "Which Executor should I use?" overview table and links to the new page - 1-single-node: gentle introduction to Future objects (result / done / cancel) - 2-hpc-cluster: "Disconnecting and Reconnecting" (shutdown wait / cancel_futures) and verifying the SLURM resource assignment with sacct - 3-hpc-job: starting Flux from the SlurmClusterExecutor submission template SLURM based examples are kept as non-executed reference code, consistent with the existing notebooks which are executed in CI without a SLURM scheduler. Co-Authored-By: Claude Opus 4.8 --- README.md | 24 +++++ docs/_toc.yml | 1 + docs/coupling.md | 170 ++++++++++++++++++++++++++++++++++ notebooks/1-single-node.ipynb | 2 +- notebooks/2-hpc-cluster.ipynb | 2 +- notebooks/3-hpc-job.ipynb | 27 ++++++ 6 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 docs/coupling.md diff --git a/README.md b/README.md index 6c3c787b0..ff1737d29 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,25 @@ given allocation. Even when [SLURM](https://slurm.schedmd.com) is used as primar recommended to use [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux) as hierarchical job scheduler within the allocations. +## Which Executor should I use? +executorlib provides five `Executor` classes. They all share the same `submit()` / `map()` interface and only differ in +*where* the Python functions are executed and *how* the resources are requested. A common workflow is to develop and +test with the `SingleNodeExecutor` on a laptop and then switch to one of the HPC executors by changing only the class +name: + +| Executor | Where it runs | Scheduler command | Best for | +|---|---|---|---| +| [`SingleNodeExecutor`](https://executorlib.readthedocs.io/en/latest/1-single-node.html) | laptop, workstation or single compute node | `subprocess` | developing and testing a workflow | +| [`SlurmClusterExecutor`](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#slurm) | HPC login node | `sbatch` (one job per function) | long-running functions that should outlive the Python session | +| [`SlurmJobExecutor`](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm) | inside a SLURM allocation | `srun` (job steps) | many functions within one existing allocation | +| [`FluxClusterExecutor`](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#flux) | HPC login node or Flux instance | `flux submit` | long-running functions; disconnecting and reconnecting | +| [`FluxJobExecutor`](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#flux) | inside a Flux allocation | `flux run` | high-throughput execution of many short functions | + +The **Cluster** executors submit each Python function as an individual job and communicate via the file system, so the +Python process which created the executor can be closed and the results reloaded later. The **Job** executors run inside +an existing allocation and communicate via sockets, which has lower overhead and is the better choice for many short +function calls. + ## Documentation * [Installation](https://executorlib.readthedocs.io/en/latest/installation.html) * [Minimal](https://executorlib.readthedocs.io/en/latest/installation.html#minimal) @@ -173,6 +192,11 @@ as hierarchical job scheduler within the allocations. * [Application](https://executorlib.readthedocs.io/en/latest/application.html) * [GPAW](https://executorlib.readthedocs.io/en/latest/4-1-gpaw.html) * [Quantum Espresso](https://executorlib.readthedocs.io/en/latest/4-2-quantum-espresso.html) +* [Coupling with other Libraries](https://executorlib.readthedocs.io/en/latest/coupling.html) + * [emcee](https://executorlib.readthedocs.io/en/latest/coupling.html#emcee-markov-chain-monte-carlo) + * [pipefunc](https://executorlib.readthedocs.io/en/latest/coupling.html#pipefunc-function-pipelines) + * [omp4py](https://executorlib.readthedocs.io/en/latest/coupling.html#omp4py-openmp-for-python) + * [pylammpsmpi](https://executorlib.readthedocs.io/en/latest/coupling.html#pylammpsmpi-mpi-parallel-lammps) * [Trouble Shooting](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html) * [Filesystem Usage](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#filesystem-usage) * [Firewall Issues](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#firewall-issues) diff --git a/docs/_toc.yml b/docs/_toc.yml index d384441de..e10f734db 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -10,6 +10,7 @@ chapters: sections: - file: 4-1-gpaw.ipynb - file: 4-2-quantum-espresso.ipynb +- file: coupling.md - file: trouble_shooting.md - file: 5-developer.ipynb - file: api.rst diff --git a/docs/coupling.md b/docs/coupling.md new file mode 100644 index 000000000..88d0d0579 --- /dev/null +++ b/docs/coupling.md @@ -0,0 +1,170 @@ +# Coupling with other Libraries +A lot of scientific Python packages already know how to distribute work over many processes - they just need to be +handed an object that behaves like an executor or a worker pool. Because executorlib implements the +[Executor interface](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) of the Python standard +library, it can be passed to these packages in place of the +[ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or the +[multiprocessing.Pool](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool). The practical +benefit for you as a scientist is that the workflow stays exactly the same when you move from your laptop to a high +performance computer (HPC): you develop and test with the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) +and then switch to the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) or the +[HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) by changing a single class name. + +```{note} +The examples below require the respective third-party package to be installed in addition to executorlib (for example +`pip install emcee`). They are shown as reference code rather than executed examples, because these optional packages are +not part of the executorlib test environment. In every example the `SingleNodeExecutor` can be replaced by a +`FluxJobExecutor`, `FluxClusterExecutor`, `SlurmJobExecutor` or `SlurmClusterExecutor` to scale the same workflow to an +HPC cluster. +``` + +## emcee (Markov Chain Monte Carlo) +[emcee](https://emcee.readthedocs.io) is a widely used Python package for Markov Chain Monte Carlo (MCMC) sampling, for +example to estimate the posterior distribution of model parameters from experimental data. The likelihood function has +to be evaluated many times per sampling step, and these evaluations are independent of each other, so they can be +executed in parallel. The `EnsembleSampler` of emcee accepts any worker pool which provides a `map()` function via the +`pool` parameter. As executorlib provides this interface, an executorlib `Executor` can be used directly as a drop-in +replacement for the [multiprocessing.Pool](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool) +which is recommended in the [emcee parallelization tutorial](https://emcee.readthedocs.io/en/stable/tutorials/parallel/): +```python +import numpy as np +import emcee +from executorlib import SingleNodeExecutor + + +def log_prob(theta): + return -0.5 * np.sum(theta**2) + + +initial = np.random.randn(32, 5) +nwalkers, ndim = initial.shape + +with SingleNodeExecutor(block_allocation=True) as exe: + sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob, pool=exe) + sampler.run_mcmc(initial, 100, progress=True) +``` +Here the `block_allocation=True` parameter is set to reuse the same Python processes for the repeated evaluation of the +`log_prob()` function, which reduces the overhead for these many short function calls - +[block allocation](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation). For computationally +more expensive likelihood functions the parallel evaluation provides a substantial speed-up, and by replacing the +`SingleNodeExecutor` with a `FluxJobExecutor` the very same sampling can be distributed over multiple compute nodes of an +HPC cluster. + +## pipefunc (Function Pipelines) +[pipefunc](https://pipefunc.readthedocs.io) is a library to build function pipelines, where the output of one function +is used as the input for the next function, including map-reduce patterns over many parameters. pipefunc takes care of +the book keeping of the pipeline, while the actual execution of the individual functions is delegated to an executor. +The `map()` function of a pipefunc `Pipeline` accepts an `executor` parameter, which can either be a single executorlib +`Executor` or a dictionary which assigns a dedicated executor to each output: +```python +import numpy as np +from pipefunc import Pipeline, pipefunc +from executorlib import SingleNodeExecutor + + +@pipefunc(output_name="y", mapspec="x[i] -> y[i]") +def f(x): + return x**2 + + +@pipefunc(output_name="z", mapspec="y[i] -> z[i]") +def g(y): + return y + 1 + + +pipeline = Pipeline([f, g]) +inputs = {"x": [1, 2, 3]} + +with SingleNodeExecutor() as exe: + results = pipeline.map(inputs, executor=exe) + print(results["z"].output.tolist()) +``` +Assigning a different executor to each output enables fine-grained control over the computing resources. For example a +serial preprocessing step can be executed on a single core while a computationally expensive simulation step is +distributed over multiple compute nodes: +```python +executor = { + "y": SingleNodeExecutor(max_workers=2), + "z": SingleNodeExecutor(max_workers=4), +} +results = pipeline.map(inputs, executor=executor) +``` +The combination of pipefunc and executorlib is explained in more detail in the +[pipefunc documentation on execution and parallelism](https://pipefunc.readthedocs.io/en/latest/concepts/execution-and-parallelism/). + +## omp4py (OpenMP for Python) +The [thread based parallelism](https://executorlib.readthedocs.io/en/latest/1-single-node.html#thread-parallel-functions) +of executorlib is most commonly used to control the number of threads in linked libraries like NumPy. With +[omp4py](https://omp4py.readthedocs.io) - a Python implementation of [OpenMP](https://www.openmp.org) - it is also +possible to write thread parallel Python code directly. The number of threads assigned to the Python function is set via +the `threads_per_core` parameter in the `resource_dict`. The following example approximates the value of pi using a +parallel for loop with an OpenMP reduction: +```python +import random +from omp4py import omp +from executorlib import SingleNodeExecutor + + +@omp +def calc_pi(num_points): + count = 0 + with omp("parallel for reduction(+:count)"): + for i in range(num_points): + x = random.random() + y = random.random() + if x * x + y * y <= 1.0: + count += 1 + return 4 * (count / num_points) + + +with SingleNodeExecutor() as exe: + future = exe.submit(calc_pi, 10000000, resource_dict={"threads_per_core": 4}) + print(future.result()) +``` +The `threads_per_core` parameter sets the environment variables which control the number of threads, so the requested +number of cores is reserved for the threads created by omp4py inside the `calc_pi()` function. + +## pylammpsmpi (MPI-parallel LAMMPS) +[pylammpsmpi](https://pylammpsmpi.readthedocs.io) provides a Python interface to the molecular dynamics code +[LAMMPS](https://www.lammps.org) which distributes the simulation over multiple MPI ranks while the Python process +itself remains serial. Internally pylammpsmpi uses an executor to start the MPI-parallel LAMMPS processes, so an +executorlib `Executor` can be provided via the `executor` parameter. In combination with +[atomistics](https://atomistics.readthedocs.io) this can be used to run an MPI-parallel molecular dynamics simulation: +```python +from ase.build import bulk +from atomistics.calculators import ( + calc_molecular_dynamics_nvt_with_lammpslib, + get_potential_by_name, +) +from pylammpsmpi import LammpsASELibrary +from executorlib import SingleNodeExecutor + +structure = bulk("Ti") +potential = get_potential_by_name(potential_name="2016--Mendelev-M-I--Ti-3--LAMMPS--ipr1") + +with SingleNodeExecutor(resource_dict={"cores": 2}) as exe: + lmp = LammpsASELibrary(executor=exe) + result_dict = calc_molecular_dynamics_nvt_with_lammpslib( + structure=structure, + potential_dataframe=potential, + lmp=lmp, + ) + lmp.close() +``` +The `resource_dict={"cores": 2}` assigns two MPI ranks to the LAMMPS simulation. As for the other examples, replacing +the `SingleNodeExecutor` with one of the HPC executors distributes the LAMMPS simulation over the compute nodes of an +HPC cluster without any further changes to the simulation code. + +## General Pattern +The four examples above follow the same pattern: a library which already supports parallel execution accepts an +executorlib `Executor` (or worker pool), so executorlib takes over the distribution of the work. Whenever a Python +package accepts a [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) +or a [multiprocessing.Pool](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool) - typically +exposed via a parameter named `executor` or `pool` - it can be combined with executorlib. The recommended approach +remains the same in all cases: + +* Develop and test the workflow with the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) + on a laptop or workstation. +* Switch to the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) or the + [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) to scale to an HPC cluster by + changing only the executor class name. diff --git a/notebooks/1-single-node.ipynb b/notebooks/1-single-node.ipynb index ddec40132..55afeb339 100644 --- a/notebooks/1-single-node.ipynb +++ b/notebooks/1-single-node.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.13.13","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"6915218e-7cf3-4bf4-9618-7e6942b4762f","cell_type":"markdown","source":"# Single Node Executor\nThe `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n\nAn advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. ","metadata":{}},{"id":"ccc686dd-8fc5-4755-8a19-f40010ebb1b8","cell_type":"markdown","source":"## Basic Functionality\nThe general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib.","metadata":{}},{"id":"b1907f12-7378-423b-9b83-1b65fc0a20f5","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"1654679f-38b3-4699-9bfe-b48cbde0b2db","cell_type":"markdown","source":"It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:","metadata":{}},{"id":"16f7d138-ed77-45ea-a554-d329f7237500","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 35.7 ms, sys: 38.1 ms, total: 73.8 ms\nWall time: 274 ms\n"}],"execution_count":2},{"id":"a1109584-9db2-4f9d-b3ed-494d96241396","cell_type":"markdown","source":"As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement.","metadata":{}},{"id":"cfccdf9a-b23b-4814-8c14-36703a8a5f9e","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[4, 6, 8]\nCPU times: user 36.6 ms, sys: 10 ms, total: 46.6 ms\nWall time: 1.16 s\n"}],"execution_count":3},{"id":"7db58f70-8137-4f1c-a87b-0d282f2bc3c5","cell_type":"markdown","source":"If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). ","metadata":{}},{"id":"abd0beb7-471d-490e-bb9c-96755bd7aacf","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n print(list(results))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[10, 12, 14]\nCPU times: user 35.4 ms, sys: 9.3 ms, total: 44.7 ms\nWall time: 1.2 s\n"}],"execution_count":4},{"id":"d07a3314-fbaf-4ff5-98dd-dea62194abcd","cell_type":"markdown","source":"The `map()` function also works naturally with [pandas](https://pandas.pydata.org) DataFrames. Rather than using `DataFrame.apply()` for row-wise operations, `map()` can be used to apply a function to each pair of values from two columns in parallel:","metadata":{}},{"id":"4e7d5ef4-6437-4fa2-b500-19b8cc53f377","cell_type":"code","source":"def sum_df(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"1428ac7c-dd3c-48db-b41f-4ea975df2d78","cell_type":"code","source":"import pandas\n\ndf = pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [4, 5, 6]})\n\nwith SingleNodeExecutor() as exe:\n df[\"c\"] = list(exe.map(sum_df, df[\"a\"], df[\"b\"]))\n print(df)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" a b c\n0 1 4 5\n1 2 5 7\n2 3 6 9\n"}],"execution_count":6},{"id":"ac86bf47-4eb6-4d7c-acae-760b880803a8","cell_type":"markdown","source":"These examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library.","metadata":{}},{"id":"5de0f0f2-bf5c-46b3-8171-a3a206ce6775","cell_type":"markdown","source":"## Parallel Functions\nWriting parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. ","metadata":{}},{"id":"dc8e692f-bf6c-4838-bb82-6a6b8454a2e7","cell_type":"markdown","source":"### MPI Parallel Functions\nMPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n\nThe following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n\nThe [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section.","metadata":{}},{"id":"a251d083-489e-41c1-9e49-c86093858006","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":7},{"id":"adbf8a10-04e1-4fd9-8768-4375bcba9ec3","cell_type":"markdown","source":"The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:","metadata":{}},{"id":"266864f1-d29e-4934-9b5d-51f4ffb11f5c","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":8},{"id":"3a449e3f-d7a4-4056-a1d0-35dfca4dad22","cell_type":"markdown","source":"Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again.","metadata":{}},{"id":"cb4ad978-bdf2-47bb-a7df-846641a54ec2","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":9},{"id":"c1d1d7b1-64fa-4e47-bbde-2a16036568d6","cell_type":"markdown","source":"In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \nManagement (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).","metadata":{}},{"id":"4f5c5221-d99c-4614-82b1-9d6d3260c1bf","cell_type":"markdown","source":"### Thread Parallel Functions\nAn alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`.","metadata":{}},{"id":"7a7d21f6-9f1a-4f30-8024-9993e156dc75","cell_type":"code","source":"def calc_with_threads(i):\n import os\n\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":10},{"id":"82ed8f46-836c-402e-9363-be6e16c2a0b0","cell_type":"markdown","source":"Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:","metadata":{}},{"id":"b8ed330d-ee77-44a0-a02f-670fa945b043","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":11},{"id":"63222cd5-664b-4aba-a80c-5814166b1239","cell_type":"markdown","source":"Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:","metadata":{}},{"id":"31562f89-c01c-4e7a-bbdd-fa26ca99e68b","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n fs = exe.submit(calc_with_threads, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":12},{"id":"8b78a7b4-066e-4cbc-858e-606c8bbbbf0c","cell_type":"markdown","source":"For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n\nBeyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions.","metadata":{}},{"id":"ca9bc450-2762-4d49-b7f8-48cc83e068fd","cell_type":"markdown","source":"## Performance Optimization\nThe default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.","metadata":{}},{"id":"e9b52ecf-3984-4695-98e7-315aa3712104","cell_type":"markdown","source":"### Block Allocation\nBy default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. ","metadata":{}},{"id":"0da4c7d0-2268-4ea8-b62d-5d94c79ebc72","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 31.5 ms, sys: 11.5 ms, total: 43 ms\nWall time: 709 ms\n"}],"execution_count":13},{"id":"d38163b3-1c04-431c-964b-2bad4f823a4d","cell_type":"markdown","source":"The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: ","metadata":{}},{"id":"cb8c4943-4c78-4203-95f2-1db758e588d9","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":14},{"id":"9e1212c4-e3fb-4e21-be43-0a4f0a08b856","cell_type":"markdown","source":"Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.","metadata":{}},{"id":"5ebf7195-58f9-40f2-8203-2d4b9f0e9602","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":15},{"id":"b75fb95f-f2f5-4be9-9f2a-9c2e9961c644","cell_type":"markdown","source":"The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.","metadata":{}},{"id":"8aa754cc-eb1a-4fa1-bd72-272246df1d2f","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":16},{"id":"1854895a-7239-4b30-b60d-cf1a89234464","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":17},{"id":"d07cf107-3627-4cb0-906c-647497d6e0d2","cell_type":"markdown","source":"The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored.","metadata":{}},{"id":"cc648799-a0c6-4878-a469-97457bce024f","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":18},{"id":"1073b8ca-1492-46e9-8d1f-f52ad48d28a2","cell_type":"markdown","source":"The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function.","metadata":{}},{"id":"24397d78-dff1-4834-830c-a8f390fe6b9c","cell_type":"markdown","source":"### Cache\nThe development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n\nThe [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. ","metadata":{}},{"id":"ecdcef49-5c89-4538-b377-d53979673bf7","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 84.3 ms, sys: 33.6 ms, total: 118 ms\nWall time: 1.35 s\n"}],"execution_count":19},{"id":"32d0fb2e-5ac1-4249-b6c8-953c92fdfded","cell_type":"markdown","source":"When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n\nStill it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. ","metadata":{}},{"id":"c39babe8-4370-4d31-9520-9a7ce63378c8","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 44.2 ms, sys: 15.5 ms, total: 59.7 ms\nWall time: 1.3 s\n"}],"execution_count":20},{"id":"5144a035-633e-4e60-a362-f3b15b28848b","cell_type":"markdown","source":"An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:","metadata":{}},{"id":"f574b9e1-de55-4e38-aef7-a4bed540e040","cell_type":"code","source":"import pandas\nfrom executorlib import get_cache_data\n\ndf = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\ndf","metadata":{"trusted":true},"outputs":[{"execution_count":21,"output_type":"execute_result","data":{"text/plain":" function input_args input_kwargs output resource_dict \\\n0 ([2, 2],) {} 4 {} \n1 ([3, 3],) {} 6 {} \n2 ([1, 1],) {} 2 {} \n\n runtime filename \n0 0.910742 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f... \n1 0.911247 /home/jovyan/file/sum0f7710227cda6456e5d071877... \n2 0.908284 /home/jovyan/file/sumf5ad27b855231a293ddd735a8... ","text/html":"
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
functioninput_argsinput_kwargsoutputresource_dictruntimefilename
0<built-in function sum>([2, 2],){}4{}0.910742/home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
1<built-in function sum>([3, 3],){}6{}0.911247/home/jovyan/file/sum0f7710227cda6456e5d071877...
2<built-in function sum>([1, 1],){}2{}0.908284/home/jovyan/file/sumf5ad27b855231a293ddd735a8...
\n
"},"metadata":{}}],"execution_count":21},{"id":"68092479-e846-494a-9ac9-d9638b102bd8","cell_type":"markdown","source":"After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. ","metadata":{}},{"id":"34a9316d-577f-4a63-af14-736fb4e6b219","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":22},{"id":"1cea95b5-4110-444c-82af-fa6718bfa17f","cell_type":"markdown","source":"Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. ","metadata":{}},{"id":"71a8a0be-a933-4e83-9da5-50da35e9975b","cell_type":"markdown","source":"### Dependencies\nMany scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function.","metadata":{}},{"id":"d8b75a26-479d-405e-8895-a8d56b3f0f4b","cell_type":"code","source":"def calc_add(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":23},{"id":"36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078","cell_type":"markdown","source":"For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n\nThe important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`.","metadata":{}},{"id":"35fd5747-c57d-4926-8d83-d5c55a130ad6","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":24},{"id":"38e1bbb3-1028-4f50-93c1-d2427f399a7d","cell_type":"markdown","source":"As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n\nNo computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n\nInternally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. ","metadata":{}},{"id":"f67470b5-af1d-4add-9de8-7f259ca67324","cell_type":"code","source":"with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"None\n"},{"output_type":"display_data","data":{"text/plain":"","image/svg+xml":"\n\n\n\n\n0\n\ncalc_add\n\n\n\n1\n\ncalc_add\n\n\n\n0->1\n\n\nb\n\n\n\n2\n\ncalc_add\n\n\n\n1->2\n\n\nb\n\n\n\n3\n\n1\n\n\n\n3->0\n\n\na\n\n\n\n4\n\n0\n\n\n\n4->0\n\n\nb\n\n\n\n5\n\n2\n\n\n\n5->1\n\n\na\n\n\n\n6\n\n3\n\n\n\n6->2\n\n\na\n\n\n"},"metadata":{}}],"execution_count":25},{"id":"4c952a9d-2b58-401d-b7ec-fda740db774a","cell_type":"markdown","source":"## Advanced Scheduling\nGoing beyond just directed acyclic graphs (DAG) with one-to-many and many-to-one relationships, executorlib provides a number of advanced scheduling patterns. These are briefly introduced in the following.","metadata":{}},{"id":"4db2c87c-9a7d-4074-82d0-24357fa4f0e6","cell_type":"markdown","source":"### Runtime-dependent Batching \nTo maximize the throughput of dependent calculation tasks its important to idenify all tasks which can be executed at a given moment. Unfortunately, some of these dependencies can only be determined at run time, which is challenging for most schedulers. To demonstrate the runtime-dependent batching in executorlib we discuss the following example. Starting with a group of ten tasks and then grouping them into groups of three for processing. Still the order of the tasks, which tasks belong into which group, is only determined at run time. \n\nFor simplicity, we just use a simple function which directly returns the input.","metadata":{}},{"id":"857a5e0f-50d6-45ec-aac7-a7151f36c19f","cell_type":"code","source":"def reply(i):\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":26},{"id":"544dacab-5601-4a9b-814f-88560eafb079","cell_type":"markdown","source":"After the group of ten tasks is submitted their future objects are stored in a list named `future_individual_lst`. This list is then provided to the `batched()` function of the `SingleNodeExecutor()` to generate batches of tasks which are then provided to the `sum` function for further processing. The results of this second step are stored in the `future_group_lst`. Finally, the results of these future objects are evaluated in the third step. ","metadata":{}},{"id":"17d1354a-0943-4b62-9b0d-7d39c8df23f2","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_individual_lst = [\n exe.submit(reply, i) for i in range(10)\n ]\n future_group_lst = [\n exe.submit(sum, f) for f in exe.batched(future_individual_lst, n=3)\n ]\n print(sum([f.result() for f in future_group_lst]))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"45\n"}],"execution_count":27},{"id":"80e8b154-d659-4f3e-add2-701a454d770d","cell_type":"markdown","source":"### Split Future Objects\nIn analogy to the `batched()` function which combines multiple future objects in a batch for further processing, it is also necessary to split the future objects even before the evaluation is completed. Executorlib provides two utility functions, namely `split_future()` for tuples and lists and `get_item_from_future()` for dictionaries. \n\nStarting with a function which returns a tuple, named `get_a_tuple()`:","metadata":{}},{"id":"6ef541e3-a2be-4e05-9e77-65eaceff4248","cell_type":"code","source":"def get_a_tuple(i):\n return \"a\", \"b\", i","metadata":{"trusted":true},"outputs":[],"execution_count":28},{"id":"dd3e4312-1594-4b3b-baf3-5b3121ca8910","cell_type":"markdown","source":"This function is submitted to the `SingleNodeExecutor()` and while in this case it directly returns the tuple, the evaluation would commonly take much longer. By having the ability to split the output of the future object using the `split_future()` function, a number of future objects is generated one for each element of the tuple. This is enabled by providing the number of elements in the tuple as an additional parameter `n=3`. ","metadata":{}},{"id":"927c5931-d7fd-4bc1-ae0e-3dd2f4bc99ce","cell_type":"code","source":"from executorlib import split_future","metadata":{"trusted":true},"outputs":[],"execution_count":29},{"id":"64a9b46e-0de6-4abb-99d9-8aaa0c6cbccb","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = exe.submit(get_a_tuple, 15)\n f1, f2, f3 = split_future(future=future, n=3)\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"a b 15\n"}],"execution_count":30},{"id":"a10d8f90-e790-4708-b193-90a7f014d699","cell_type":"markdown","source":"In analogy, to the `split_future()` function for lists and tuples, the `get_item_from_future()` function returns one item of a dicitionary which is returned by a function submitted to an `Executor()`. In this example the `get_a_dict()` function returns a dictionary, again in this example the dictionary is returned directly while commonly this would take much longer. ","metadata":{}},{"id":"51ec4dbf-d009-41c9-bc04-cf1b16fe782f","cell_type":"code","source":"def get_a_dict(i):\n return {\"a\": 1, \"b\": 2, \"c\": i}","metadata":{"trusted":true},"outputs":[],"execution_count":31},{"id":"5c90ab75-2a6b-4fbe-a7ee-7f91b2b4af43","cell_type":"markdown","source":"The `get_a_dict()` function is submitted to the `SingleNodeExecutor()`, it returns a future object named `future_dict`. Still as we know that the `result()` of this future object `future_dict` returns a dictionary, we can already access the items of this dictionary with the `get_item_from_future()` function. The `get_item_from_future()` function takes a future object as input in addition to the `key` of the dictionary which should be accessed, as a result the `get_item_from_future()` function returns a future object for the value related to the key. In this example these future objects are named `f1`, `f2` and `f3`. These fucture objects are evaluated afterwards.","metadata":{}},{"id":"a41251d2-a840-4c12-9625-c2e0585c489b","cell_type":"code","source":"from executorlib import get_item_from_future","metadata":{"trusted":true},"outputs":[],"execution_count":32},{"id":"a6e75c5e-ad0c-4723-99c5-8125b63fbeaf","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_dict = exe.submit(get_a_dict, 15)\n f1 = get_item_from_future(future=future_dict, key=\"a\")\n f2 = get_item_from_future(future=future_dict, key=\"b\")\n f3 = get_item_from_future(future=future_dict, key=\"c\")\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"1 2 15\n"}],"execution_count":33},{"id":"a945c98d-dc73-4284-9d8c-52475a760f6b","cell_type":"markdown","source":"### Recursion\nIn addition to the `map()` function for parallel loops and the option to integrate `while` loops directly in the submission, it is sometimes helpful to use recursive algorthims to accelerate scientific simulation and analysis. To demonstrate the use of executorlib in combination with recursive algorithms, the quicksort algorithm is demonstrated. \n\nThe quicksort algorithm is implemented in two parts, a `quick_sort()` which splits the input `sequence` into two sets, one lower and one larger than the pivot element of the `sequence`. After the initial execution the `quick_sort()` function is again applied to both resulting sets until only a single element remains in the `sequence`. In this case the single element is returned. The pivot is defined as the first element of the `sequence`.\n\nTo simplify the submission of the `quick_sort()` function to the `SingleNodeExecutor()` dictionaries are used as return types, these dictionaries either contain only a single item with the key `\"result\"` when only a single element remains in the list, or three items, one `\"left\"` for a list lower than the pivot, one `\"right\"` for the items larger than the pivot and the `\"result\"` for the pivot itself. ","metadata":{}},{"id":"08b0c0a1-34ad-4b2a-847c-fe9c96956b27","cell_type":"code","source":"def quick_sort(sequence):\n length = len(sequence)\n if length <= 1:\n return {\"result\": sequence}\n else:\n pivot = sequence.pop() \n \n greater_items = []\n lesser_items = []\n\n for item in sequence:\n if item > pivot:\n greater_items.append(item)\n else:\n lesser_items.append(item)\n\n return {\"left\": lesser_items, \"right\": greater_items, \"result\": [pivot]}","metadata":{"trusted":true},"outputs":[],"execution_count":34},{"id":"54a8a765-0b9f-424a-86e5-8024c4d59018","cell_type":"markdown","source":"To enable the recursive submission to the `SingleNodeExecutor()` a `recusive_submit()` function is defined using the [asynchronous IO module](https://docs.python.org/3/library/asyncio.html) of the Python standard libary, indicated by the `async` keyword ahead of the function definition. It takes a `function`, the `sequence` and the `executor` as inputs. As a first step the function is submitted to the `executor` and the execution is halted until the first result is available by wrapping the future object using the `wrap_future()` function of the `asyncio` package and appling the `await` function. Then the corresponding result dictionary `result_dict` is evaluated: If the `result_dict` contains more than one key, then the `recusive_submit()` is evaluated for both the left side with lower values than the pivot element and the right side with elements higher than the pivot element. Again in both cases the `await` keyword is used to enable the parallel execution of both branches at the same time. Afterwards the result of both sides is combined with the pivot element. For the case that the dictionary only contains a single element, it is returned directly. ","metadata":{}},{"id":"4e142829-c9de-4969-ab3c-97f3e1ada90c","cell_type":"code","source":"import asyncio\n\nasync def recusive_submit(function, sequence, executor):\n result_dict = await asyncio.wrap_future(exe.submit(function, sequence))\n if len(result_dict) > 1:\n left = await recusive_submit(function=function, sequence=result_dict[\"left\"], executor=executor)\n right = await recusive_submit(function=function, sequence=result_dict[\"right\"], executor=executor)\n return left + result_dict[\"result\"] + right\n else:\n return result_dict[\"result\"]","metadata":{"trusted":true},"outputs":[],"execution_count":35},{"id":"531a8f7d-5b21-46d0-b632-32a1568571c8","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n loop = asyncio.get_event_loop()\n task = loop.create_task(recusive_submit(function=quick_sort, sequence=[0,9,3,8,2,7,5], executor=exe))\n print(await task)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[0, 2, 3, 5, 7, 8, 9]\n"}],"execution_count":36},{"id":"725c8ac2-27e4-495b-b061-1fd6053f4c0b","cell_type":"markdown","source":"## Testing and Debugging\nThe up-scaling of Python functions from a single workstation to an High Performance Computer (HPC) can be challenging, so executorlib provides a number of debugging utilities to help you optimize your functions for execution with executorlib. ","metadata":{}},{"id":"2c96b460-ebd2-4c8a-a5e1-922a860808ab","cell_type":"markdown","source":"### Measure Data Transferred \nTransferring a large amount of data between two processes requires additional resources so it is helpful to measure the data transferred between the frontend and backend process. This is achieved by setting the `log_obj_size` parameter to `True`:","metadata":{}},{"id":"71905acf-2fef-4738-82ae-89e35e2e3d2d","cell_type":"code","source":"from executorlib import SingleNodeExecutor\n\nwith SingleNodeExecutor(log_obj_size=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stderr","output_type":"stream","text":"Send dictionary of size: 101\nReceived dictionary of size: 59\nSend dictionary of size: 69\nReceived dictionary of size: 58\n"},{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":37},{"id":"0e9f7192-2b7d-4caf-b2b8-ae9cb060e90f","cell_type":"markdown","source":"### Write Log Files\nLibraries like executorlib are commonly used to sample a large parameter space, in this case it is possible that out of a large number of parameters one combination throws an error. This error can be logged in a file which also contains the function and input parameters using the `\"error_log_file\"` parameter in the `resource_dict`. This allows to change the log file on a per-function bases.","metadata":{}},{"id":"2810acd5-a46e-4d85-b447-945248ffca15","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":38},{"id":"ef6b87a7-361e-4a3f-bad3-72ad87968e8c","cell_type":"code","source":"def my_funct(i, j): \n if i == 2 and j == 2:\n raise ValueError()\n else: \n return i * j + i + j","metadata":{"trusted":true},"outputs":[],"execution_count":39},{"id":"209ff76b-913a-46f7-9fff-250e207898b2","cell_type":"markdown","source":"A try and except statement is added to prevent the jupyter notebook from crashing:","metadata":{}},{"id":"30381a00-30cc-45a2-82ff-17371339f5c7","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"error_log_file\": \"error.out\"}) as exe:\n future_lst = []\n for i in range(4):\n for j in range(4):\n future_lst.append(exe.submit(my_funct, i=i, j=j))\n try:\n print([f.result() for f in future_lst])\n except ValueError:\n pass","metadata":{"trusted":true},"outputs":[],"execution_count":40},{"id":"36bfc556-aa86-4b50-972b-4b8f68ceec3d","cell_type":"markdown","source":"The content of the log file is a basic text file, so it can be read with any kind of file utility. The important part is that the log file contains not only the error message but in addition also the function name and the input parameters in the case `kwargs: {'i': 2, 'j': 2}` which helps for future debugging of the sampling function.","metadata":{}},{"id":"a61c06fc-3029-48ed-895f-9f6c065b7c47","cell_type":"code","source":"with open(\"error.out\") as f:\n content = f.readlines()","metadata":{"trusted":true},"outputs":[],"execution_count":41},{"id":"6a6cd10f-6fe3-4a17-a8e6-2536feb9a11a","cell_type":"code","source":"content","metadata":{"trusted":true},"outputs":[{"execution_count":42,"output_type":"execute_result","data":{"text/plain":"['function: \\n',\n 'args: ()\\n',\n \"kwargs: {'i': 2, 'j': 2}\\n\",\n 'Traceback (most recent call last):\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/backend/interactive_serial.py\", line 56, in main\\n',\n ' output = call_funct(input_dict=input_dict, funct=None, memory=memory)\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 33, in call_funct\\n',\n ' return funct(input_dict[\"fn\"], *input_dict[\"args\"], **input_dict[\"kwargs\"])\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 22, in funct\\n',\n ' return args[0].__call__(*args[1:], **kwargs)\\n',\n ' ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^\\n',\n ' File \"/tmp/ipykernel_463/3167739528.py\", line 3, in my_funct\\n',\n 'ValueError\\n']"},"metadata":{}}],"execution_count":42},{"id":"5aff120a-317f-47d4-9639-8eccfb136117","cell_type":"code","source":"import os\n\nif os.path.exists(\"error.out\"):\n os.remove(\"error.out\")","metadata":{"trusted":true},"outputs":[],"execution_count":43},{"id":"44cd41d7-4417-429e-8481-f2a49e5f769c","cell_type":"markdown","source":"### TestClusterExecutor\nWhile the `SingleNodeExecutor` internally behaves very similar to the `FluxJobExecutor` and `SlurmJobExecutor` the `FluxClusterExecutor` and `SlurmClusterExecutor` behave very different as they use the file system to exchange information rather than socket-based communication. This can lead to complications when it comes to debugging. To address this challenge executorlib provides the `TestClusterExecutor` which can be executed on a local workstation just like the `SingleNodeExecutor` but in the background it uses the same file based communication like the `SlurmClusterExecutor` and the `FluxClusterExecutor`:","metadata":{}},{"id":"2ae29fb0-54e5-468f-8727-a179b6ed363e","cell_type":"code","source":"from executorlib.api import TestClusterExecutor\n\nwith TestClusterExecutor(cache_directory=\"test\") as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":44}]} \ No newline at end of file +{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.13.13","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"6915218e-7cf3-4bf4-9618-7e6942b4762f","cell_type":"markdown","source":"# Single Node Executor\nThe `SingleNodeExecutor` in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.\n\nAn advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. ","metadata":{}},{"id":"ccc686dd-8fc5-4755-8a19-f40010ebb1b8","cell_type":"markdown","source":"## Basic Functionality\nThe general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `SingleNodeExecutor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `SingleNodeExecutor` class to start using executorlib.","metadata":{}},{"id":"b1907f12-7378-423b-9b83-1b65fc0a20f5","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"1654679f-38b3-4699-9bfe-b48cbde0b2db","cell_type":"markdown","source":"It is recommended to use the `SingleNodeExecutor` class in combination with a `with`-statement. This guarantees the processes created by the `SingleNodeExecutor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:","metadata":{}},{"id":"16f7d138-ed77-45ea-a554-d329f7237500","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 35.7 ms, sys: 38.1 ms, total: 73.8 ms\nWall time: 274 ms\n"}],"execution_count":2},{"id":"a1109584-9db2-4f9d-b3ed-494d96241396","cell_type":"markdown","source":"As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `SingleNodeExecutor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `SingleNodeExecutor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `SingleNodeExecutor()` class as provided by the `with`-statement.","metadata":{}},{"cell_type":"markdown","id":"9d4d4481-b637-4b15-aa14-0de62199411f","metadata":{},"source":["### Understanding Future Objects\n","Each call of the `submit()` function returns immediately - not with the result of the function but with a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object. A `Future` is a placeholder for a result which is not yet available. The submitted function is executed in the background while the main Python program continues, this is what allows executorlib to evaluate many functions at the same time. The most important methods of a `Future` object are:\n","\n","| Method | Description |\n","|---|---|\n","| `future.result()` | Wait until the function is finished and return its result - or raise the exception if the function failed. |\n","| `future.done()` | Return `True` if the function already finished, without waiting for it. |\n","| `future.cancel()` | Try to cancel the function in case it has not started yet. |\n","\n","The key to parallel execution is to first submit all functions and only afterwards collect the results by calling `result()`. When `result()` is called directly after each `submit()`, the program waits for each function to finish before the next function is submitted, so the functions are executed one after another rather than in parallel. The example below illustrates this: directly after the submission the function is not yet finished, while after requesting the result with `result()` it is."]},{"cell_type":"code","execution_count":null,"id":"b653ab65-b6fb-40a2-999c-fa2a16f5d9aa","metadata":{"trusted":true},"outputs":[],"source":["with SingleNodeExecutor() as exe:\n"," future = exe.submit(sum, [1, 1])\n"," print(\"finished directly after submit:\", future.done())\n"," print(\"result:\", future.result())\n"," print(\"finished after result:\", future.done())"]},{"id":"cfccdf9a-b23b-4814-8c14-36703a8a5f9e","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[4, 6, 8]\nCPU times: user 36.6 ms, sys: 10 ms, total: 46.6 ms\nWall time: 1.16 s\n"}],"execution_count":3},{"id":"7db58f70-8137-4f1c-a87b-0d282f2bc3c5","cell_type":"markdown","source":"If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). ","metadata":{}},{"id":"abd0beb7-471d-490e-bb9c-96755bd7aacf","cell_type":"code","source":"%%time\nwith SingleNodeExecutor() as exe:\n results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n print(list(results))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[10, 12, 14]\nCPU times: user 35.4 ms, sys: 9.3 ms, total: 44.7 ms\nWall time: 1.2 s\n"}],"execution_count":4},{"id":"d07a3314-fbaf-4ff5-98dd-dea62194abcd","cell_type":"markdown","source":"The `map()` function also works naturally with [pandas](https://pandas.pydata.org) DataFrames. Rather than using `DataFrame.apply()` for row-wise operations, `map()` can be used to apply a function to each pair of values from two columns in parallel:","metadata":{}},{"id":"4e7d5ef4-6437-4fa2-b500-19b8cc53f377","cell_type":"code","source":"def sum_df(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"1428ac7c-dd3c-48db-b41f-4ea975df2d78","cell_type":"code","source":"import pandas\n\ndf = pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [4, 5, 6]})\n\nwith SingleNodeExecutor() as exe:\n df[\"c\"] = list(exe.map(sum_df, df[\"a\"], df[\"b\"]))\n print(df)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" a b c\n0 1 4 5\n1 2 5 7\n2 3 6 9\n"}],"execution_count":6},{"id":"ac86bf47-4eb6-4d7c-acae-760b880803a8","cell_type":"markdown","source":"These examples cover the general functionality of the `SingleNodeExecutor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library.","metadata":{}},{"id":"5de0f0f2-bf5c-46b3-8171-a3a206ce6775","cell_type":"markdown","source":"## Parallel Functions\nWriting parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. ","metadata":{}},{"id":"dc8e692f-bf6c-4838-bb82-6a6b8454a2e7","cell_type":"markdown","source":"### MPI Parallel Functions\nMPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `SingleNodeExecutor` for evaluation.\n\nThe following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n\nThe [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section.","metadata":{}},{"id":"a251d083-489e-41c1-9e49-c86093858006","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":7},{"id":"adbf8a10-04e1-4fd9-8768-4375bcba9ec3","cell_type":"markdown","source":"The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:","metadata":{}},{"id":"266864f1-d29e-4934-9b5d-51f4ffb11f5c","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":8},{"id":"3a449e3f-d7a4-4056-a1d0-35dfca4dad22","cell_type":"markdown","source":"Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again.","metadata":{}},{"id":"cb4ad978-bdf2-47bb-a7df-846641a54ec2","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":9},{"id":"c1d1d7b1-64fa-4e47-bbde-2a16036568d6","cell_type":"markdown","source":"In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \nManagement (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).","metadata":{}},{"id":"4f5c5221-d99c-4614-82b1-9d6d3260c1bf","cell_type":"markdown","source":"### Thread Parallel Functions\nAn alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`.","metadata":{}},{"id":"7a7d21f6-9f1a-4f30-8024-9993e156dc75","cell_type":"code","source":"def calc_with_threads(i):\n import os\n\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":10},{"id":"82ed8f46-836c-402e-9363-be6e16c2a0b0","cell_type":"markdown","source":"Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:","metadata":{}},{"id":"b8ed330d-ee77-44a0-a02f-670fa945b043","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":11},{"id":"63222cd5-664b-4aba-a80c-5814166b1239","cell_type":"markdown","source":"Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:","metadata":{}},{"id":"31562f89-c01c-4e7a-bbdd-fa26ca99e68b","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n fs = exe.submit(calc_with_threads, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"3\n"}],"execution_count":12},{"id":"8b78a7b4-066e-4cbc-858e-606c8bbbbf0c","cell_type":"markdown","source":"For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n\nBeyond MPI based parallelism and thread based parallelism the [HPC Cluster Executors](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) and the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) also provide the option to assign GPUs to the execution of individual Python functions.","metadata":{}},{"id":"ca9bc450-2762-4d49-b7f8-48cc83e068fd","cell_type":"markdown","source":"## Performance Optimization\nThe default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.","metadata":{}},{"id":"e9b52ecf-3984-4695-98e7-315aa3712104","cell_type":"markdown","source":"### Block Allocation\nBy default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. ","metadata":{}},{"id":"0da4c7d0-2268-4ea8-b62d-5d94c79ebc72","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\nCPU times: user 31.5 ms, sys: 11.5 ms, total: 43 ms\nWall time: 709 ms\n"}],"execution_count":13},{"id":"d38163b3-1c04-431c-964b-2bad4f823a4d","cell_type":"markdown","source":"The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: ","metadata":{}},{"id":"cb8c4943-4c78-4203-95f2-1db758e588d9","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":14},{"id":"9e1212c4-e3fb-4e21-be43-0a4f0a08b856","cell_type":"markdown","source":"Still the resource dictionary parameter can still be set during the initialisation of the `SingleNodeExecutor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.","metadata":{}},{"id":"5ebf7195-58f9-40f2-8203-2d4b9f0e9602","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n fs = exe.submit(calc_mpi, 3)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":15},{"id":"b75fb95f-f2f5-4be9-9f2a-9c2e9961c644","cell_type":"markdown","source":"The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.","metadata":{}},{"id":"8aa754cc-eb1a-4fa1-bd72-272246df1d2f","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":16},{"id":"1854895a-7239-4b30-b60d-cf1a89234464","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":17},{"id":"d07cf107-3627-4cb0-906c-647497d6e0d2","cell_type":"markdown","source":"The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `SingleNodeExecutor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored.","metadata":{}},{"id":"cc648799-a0c6-4878-a469-97457bce024f","cell_type":"code","source":"with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":18},{"id":"1073b8ca-1492-46e9-8d1f-f52ad48d28a2","cell_type":"markdown","source":"The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function.","metadata":{}},{"id":"24397d78-dff1-4834-830c-a8f390fe6b9c","cell_type":"markdown","source":"### Cache\nThe development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n\nThe [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. ","metadata":{}},{"id":"ecdcef49-5c89-4538-b377-d53979673bf7","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 84.3 ms, sys: 33.6 ms, total: 118 ms\nWall time: 1.35 s\n"}],"execution_count":19},{"id":"32d0fb2e-5ac1-4249-b6c8-953c92fdfded","cell_type":"markdown","source":"When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n\nStill it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. ","metadata":{}},{"id":"c39babe8-4370-4d31-9520-9a7ce63378c8","cell_type":"code","source":"%%time\nwith SingleNodeExecutor(cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\nCPU times: user 44.2 ms, sys: 15.5 ms, total: 59.7 ms\nWall time: 1.3 s\n"}],"execution_count":20},{"id":"5144a035-633e-4e60-a362-f3b15b28848b","cell_type":"markdown","source":"An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the `get_cache_data()` function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a `pandas.DataFrame` for further processing:","metadata":{}},{"id":"f574b9e1-de55-4e38-aef7-a4bed540e040","cell_type":"code","source":"import pandas\nfrom executorlib import get_cache_data\n\ndf = pandas.DataFrame(get_cache_data(cache_directory=\"./file\"))\ndf","metadata":{"trusted":true},"outputs":[{"execution_count":21,"output_type":"execute_result","data":{"text/plain":" function input_args input_kwargs output resource_dict \\\n0 ([2, 2],) {} 4 {} \n1 ([3, 3],) {} 6 {} \n2 ([1, 1],) {} 2 {} \n\n runtime filename \n0 0.910742 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f... \n1 0.911247 /home/jovyan/file/sum0f7710227cda6456e5d071877... \n2 0.908284 /home/jovyan/file/sumf5ad27b855231a293ddd735a8... ","text/html":"
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
functioninput_argsinput_kwargsoutputresource_dictruntimefilename
0<built-in function sum>([2, 2],){}4{}0.910742/home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
1<built-in function sum>([3, 3],){}6{}0.911247/home/jovyan/file/sum0f7710227cda6456e5d071877...
2<built-in function sum>([1, 1],){}2{}0.908284/home/jovyan/file/sumf5ad27b855231a293ddd735a8...
\n
"},"metadata":{}}],"execution_count":21},{"id":"68092479-e846-494a-9ac9-d9638b102bd8","cell_type":"markdown","source":"After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. ","metadata":{}},{"id":"34a9316d-577f-4a63-af14-736fb4e6b219","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":22},{"id":"1cea95b5-4110-444c-82af-fa6718bfa17f","cell_type":"markdown","source":"Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. ","metadata":{}},{"id":"71a8a0be-a933-4e83-9da5-50da35e9975b","cell_type":"markdown","source":"### Dependencies\nMany scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `SingleNodeExecutor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `SingleNodeExecutor` which are resolved before submission. So the `SingleNodeExecutor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function.","metadata":{}},{"id":"d8b75a26-479d-405e-8895-a8d56b3f0f4b","cell_type":"code","source":"def calc_add(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":23},{"id":"36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078","cell_type":"markdown","source":"For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n\nThe important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `SingleNodeExecutor`.","metadata":{}},{"id":"35fd5747-c57d-4926-8d83-d5c55a130ad6","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":24},{"id":"38e1bbb3-1028-4f50-93c1-d2427f399a7d","cell_type":"markdown","source":"As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `SingleNodeExecutor` class.\n\nNo computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n\nInternally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. ","metadata":{}},{"id":"f67470b5-af1d-4add-9de8-7f259ca67324","cell_type":"code","source":"with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(calc_add, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"None\n"},{"output_type":"display_data","data":{"text/plain":"","image/svg+xml":"\n\n\n\n\n0\n\ncalc_add\n\n\n\n1\n\ncalc_add\n\n\n\n0->1\n\n\nb\n\n\n\n2\n\ncalc_add\n\n\n\n1->2\n\n\nb\n\n\n\n3\n\n1\n\n\n\n3->0\n\n\na\n\n\n\n4\n\n0\n\n\n\n4->0\n\n\nb\n\n\n\n5\n\n2\n\n\n\n5->1\n\n\na\n\n\n\n6\n\n3\n\n\n\n6->2\n\n\na\n\n\n"},"metadata":{}}],"execution_count":25},{"id":"4c952a9d-2b58-401d-b7ec-fda740db774a","cell_type":"markdown","source":"## Advanced Scheduling\nGoing beyond just directed acyclic graphs (DAG) with one-to-many and many-to-one relationships, executorlib provides a number of advanced scheduling patterns. These are briefly introduced in the following.","metadata":{}},{"id":"4db2c87c-9a7d-4074-82d0-24357fa4f0e6","cell_type":"markdown","source":"### Runtime-dependent Batching \nTo maximize the throughput of dependent calculation tasks its important to idenify all tasks which can be executed at a given moment. Unfortunately, some of these dependencies can only be determined at run time, which is challenging for most schedulers. To demonstrate the runtime-dependent batching in executorlib we discuss the following example. Starting with a group of ten tasks and then grouping them into groups of three for processing. Still the order of the tasks, which tasks belong into which group, is only determined at run time. \n\nFor simplicity, we just use a simple function which directly returns the input.","metadata":{}},{"id":"857a5e0f-50d6-45ec-aac7-a7151f36c19f","cell_type":"code","source":"def reply(i):\n return i","metadata":{"trusted":true},"outputs":[],"execution_count":26},{"id":"544dacab-5601-4a9b-814f-88560eafb079","cell_type":"markdown","source":"After the group of ten tasks is submitted their future objects are stored in a list named `future_individual_lst`. This list is then provided to the `batched()` function of the `SingleNodeExecutor()` to generate batches of tasks which are then provided to the `sum` function for further processing. The results of this second step are stored in the `future_group_lst`. Finally, the results of these future objects are evaluated in the third step. ","metadata":{}},{"id":"17d1354a-0943-4b62-9b0d-7d39c8df23f2","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_individual_lst = [\n exe.submit(reply, i) for i in range(10)\n ]\n future_group_lst = [\n exe.submit(sum, f) for f in exe.batched(future_individual_lst, n=3)\n ]\n print(sum([f.result() for f in future_group_lst]))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"45\n"}],"execution_count":27},{"id":"80e8b154-d659-4f3e-add2-701a454d770d","cell_type":"markdown","source":"### Split Future Objects\nIn analogy to the `batched()` function which combines multiple future objects in a batch for further processing, it is also necessary to split the future objects even before the evaluation is completed. Executorlib provides two utility functions, namely `split_future()` for tuples and lists and `get_item_from_future()` for dictionaries. \n\nStarting with a function which returns a tuple, named `get_a_tuple()`:","metadata":{}},{"id":"6ef541e3-a2be-4e05-9e77-65eaceff4248","cell_type":"code","source":"def get_a_tuple(i):\n return \"a\", \"b\", i","metadata":{"trusted":true},"outputs":[],"execution_count":28},{"id":"dd3e4312-1594-4b3b-baf3-5b3121ca8910","cell_type":"markdown","source":"This function is submitted to the `SingleNodeExecutor()` and while in this case it directly returns the tuple, the evaluation would commonly take much longer. By having the ability to split the output of the future object using the `split_future()` function, a number of future objects is generated one for each element of the tuple. This is enabled by providing the number of elements in the tuple as an additional parameter `n=3`. ","metadata":{}},{"id":"927c5931-d7fd-4bc1-ae0e-3dd2f4bc99ce","cell_type":"code","source":"from executorlib import split_future","metadata":{"trusted":true},"outputs":[],"execution_count":29},{"id":"64a9b46e-0de6-4abb-99d9-8aaa0c6cbccb","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future = exe.submit(get_a_tuple, 15)\n f1, f2, f3 = split_future(future=future, n=3)\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"a b 15\n"}],"execution_count":30},{"id":"a10d8f90-e790-4708-b193-90a7f014d699","cell_type":"markdown","source":"In analogy, to the `split_future()` function for lists and tuples, the `get_item_from_future()` function returns one item of a dicitionary which is returned by a function submitted to an `Executor()`. In this example the `get_a_dict()` function returns a dictionary, again in this example the dictionary is returned directly while commonly this would take much longer. ","metadata":{}},{"id":"51ec4dbf-d009-41c9-bc04-cf1b16fe782f","cell_type":"code","source":"def get_a_dict(i):\n return {\"a\": 1, \"b\": 2, \"c\": i}","metadata":{"trusted":true},"outputs":[],"execution_count":31},{"id":"5c90ab75-2a6b-4fbe-a7ee-7f91b2b4af43","cell_type":"markdown","source":"The `get_a_dict()` function is submitted to the `SingleNodeExecutor()`, it returns a future object named `future_dict`. Still as we know that the `result()` of this future object `future_dict` returns a dictionary, we can already access the items of this dictionary with the `get_item_from_future()` function. The `get_item_from_future()` function takes a future object as input in addition to the `key` of the dictionary which should be accessed, as a result the `get_item_from_future()` function returns a future object for the value related to the key. In this example these future objects are named `f1`, `f2` and `f3`. These fucture objects are evaluated afterwards.","metadata":{}},{"id":"a41251d2-a840-4c12-9625-c2e0585c489b","cell_type":"code","source":"from executorlib import get_item_from_future","metadata":{"trusted":true},"outputs":[],"execution_count":32},{"id":"a6e75c5e-ad0c-4723-99c5-8125b63fbeaf","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n future_dict = exe.submit(get_a_dict, 15)\n f1 = get_item_from_future(future=future_dict, key=\"a\")\n f2 = get_item_from_future(future=future_dict, key=\"b\")\n f3 = get_item_from_future(future=future_dict, key=\"c\")\n print(f1.result(), f2.result(), f3.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"1 2 15\n"}],"execution_count":33},{"id":"a945c98d-dc73-4284-9d8c-52475a760f6b","cell_type":"markdown","source":"### Recursion\nIn addition to the `map()` function for parallel loops and the option to integrate `while` loops directly in the submission, it is sometimes helpful to use recursive algorthims to accelerate scientific simulation and analysis. To demonstrate the use of executorlib in combination with recursive algorithms, the quicksort algorithm is demonstrated. \n\nThe quicksort algorithm is implemented in two parts, a `quick_sort()` which splits the input `sequence` into two sets, one lower and one larger than the pivot element of the `sequence`. After the initial execution the `quick_sort()` function is again applied to both resulting sets until only a single element remains in the `sequence`. In this case the single element is returned. The pivot is defined as the first element of the `sequence`.\n\nTo simplify the submission of the `quick_sort()` function to the `SingleNodeExecutor()` dictionaries are used as return types, these dictionaries either contain only a single item with the key `\"result\"` when only a single element remains in the list, or three items, one `\"left\"` for a list lower than the pivot, one `\"right\"` for the items larger than the pivot and the `\"result\"` for the pivot itself. ","metadata":{}},{"id":"08b0c0a1-34ad-4b2a-847c-fe9c96956b27","cell_type":"code","source":"def quick_sort(sequence):\n length = len(sequence)\n if length <= 1:\n return {\"result\": sequence}\n else:\n pivot = sequence.pop() \n \n greater_items = []\n lesser_items = []\n\n for item in sequence:\n if item > pivot:\n greater_items.append(item)\n else:\n lesser_items.append(item)\n\n return {\"left\": lesser_items, \"right\": greater_items, \"result\": [pivot]}","metadata":{"trusted":true},"outputs":[],"execution_count":34},{"id":"54a8a765-0b9f-424a-86e5-8024c4d59018","cell_type":"markdown","source":"To enable the recursive submission to the `SingleNodeExecutor()` a `recusive_submit()` function is defined using the [asynchronous IO module](https://docs.python.org/3/library/asyncio.html) of the Python standard libary, indicated by the `async` keyword ahead of the function definition. It takes a `function`, the `sequence` and the `executor` as inputs. As a first step the function is submitted to the `executor` and the execution is halted until the first result is available by wrapping the future object using the `wrap_future()` function of the `asyncio` package and appling the `await` function. Then the corresponding result dictionary `result_dict` is evaluated: If the `result_dict` contains more than one key, then the `recusive_submit()` is evaluated for both the left side with lower values than the pivot element and the right side with elements higher than the pivot element. Again in both cases the `await` keyword is used to enable the parallel execution of both branches at the same time. Afterwards the result of both sides is combined with the pivot element. For the case that the dictionary only contains a single element, it is returned directly. ","metadata":{}},{"id":"4e142829-c9de-4969-ab3c-97f3e1ada90c","cell_type":"code","source":"import asyncio\n\nasync def recusive_submit(function, sequence, executor):\n result_dict = await asyncio.wrap_future(exe.submit(function, sequence))\n if len(result_dict) > 1:\n left = await recusive_submit(function=function, sequence=result_dict[\"left\"], executor=executor)\n right = await recusive_submit(function=function, sequence=result_dict[\"right\"], executor=executor)\n return left + result_dict[\"result\"] + right\n else:\n return result_dict[\"result\"]","metadata":{"trusted":true},"outputs":[],"execution_count":35},{"id":"531a8f7d-5b21-46d0-b632-32a1568571c8","cell_type":"code","source":"with SingleNodeExecutor() as exe:\n loop = asyncio.get_event_loop()\n task = loop.create_task(recusive_submit(function=quick_sort, sequence=[0,9,3,8,2,7,5], executor=exe))\n print(await task)","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[0, 2, 3, 5, 7, 8, 9]\n"}],"execution_count":36},{"id":"725c8ac2-27e4-495b-b061-1fd6053f4c0b","cell_type":"markdown","source":"## Testing and Debugging\nThe up-scaling of Python functions from a single workstation to an High Performance Computer (HPC) can be challenging, so executorlib provides a number of debugging utilities to help you optimize your functions for execution with executorlib. ","metadata":{}},{"id":"2c96b460-ebd2-4c8a-a5e1-922a860808ab","cell_type":"markdown","source":"### Measure Data Transferred \nTransferring a large amount of data between two processes requires additional resources so it is helpful to measure the data transferred between the frontend and backend process. This is achieved by setting the `log_obj_size` parameter to `True`:","metadata":{}},{"id":"71905acf-2fef-4738-82ae-89e35e2e3d2d","cell_type":"code","source":"from executorlib import SingleNodeExecutor\n\nwith SingleNodeExecutor(log_obj_size=True) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stderr","output_type":"stream","text":"Send dictionary of size: 101\nReceived dictionary of size: 59\nSend dictionary of size: 69\nReceived dictionary of size: 58\n"},{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":37},{"id":"0e9f7192-2b7d-4caf-b2b8-ae9cb060e90f","cell_type":"markdown","source":"### Write Log Files\nLibraries like executorlib are commonly used to sample a large parameter space, in this case it is possible that out of a large number of parameters one combination throws an error. This error can be logged in a file which also contains the function and input parameters using the `\"error_log_file\"` parameter in the `resource_dict`. This allows to change the log file on a per-function bases.","metadata":{}},{"id":"2810acd5-a46e-4d85-b447-945248ffca15","cell_type":"code","source":"from executorlib import SingleNodeExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":38},{"id":"ef6b87a7-361e-4a3f-bad3-72ad87968e8c","cell_type":"code","source":"def my_funct(i, j): \n if i == 2 and j == 2:\n raise ValueError()\n else: \n return i * j + i + j","metadata":{"trusted":true},"outputs":[],"execution_count":39},{"id":"209ff76b-913a-46f7-9fff-250e207898b2","cell_type":"markdown","source":"A try and except statement is added to prevent the jupyter notebook from crashing:","metadata":{}},{"id":"30381a00-30cc-45a2-82ff-17371339f5c7","cell_type":"code","source":"with SingleNodeExecutor(resource_dict={\"error_log_file\": \"error.out\"}) as exe:\n future_lst = []\n for i in range(4):\n for j in range(4):\n future_lst.append(exe.submit(my_funct, i=i, j=j))\n try:\n print([f.result() for f in future_lst])\n except ValueError:\n pass","metadata":{"trusted":true},"outputs":[],"execution_count":40},{"id":"36bfc556-aa86-4b50-972b-4b8f68ceec3d","cell_type":"markdown","source":"The content of the log file is a basic text file, so it can be read with any kind of file utility. The important part is that the log file contains not only the error message but in addition also the function name and the input parameters in the case `kwargs: {'i': 2, 'j': 2}` which helps for future debugging of the sampling function.","metadata":{}},{"id":"a61c06fc-3029-48ed-895f-9f6c065b7c47","cell_type":"code","source":"with open(\"error.out\") as f:\n content = f.readlines()","metadata":{"trusted":true},"outputs":[],"execution_count":41},{"id":"6a6cd10f-6fe3-4a17-a8e6-2536feb9a11a","cell_type":"code","source":"content","metadata":{"trusted":true},"outputs":[{"execution_count":42,"output_type":"execute_result","data":{"text/plain":"['function: \\n',\n 'args: ()\\n',\n \"kwargs: {'i': 2, 'j': 2}\\n\",\n 'Traceback (most recent call last):\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/backend/interactive_serial.py\", line 56, in main\\n',\n ' output = call_funct(input_dict=input_dict, funct=None, memory=memory)\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 33, in call_funct\\n',\n ' return funct(input_dict[\"fn\"], *input_dict[\"args\"], **input_dict[\"kwargs\"])\\n',\n ' File \"/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py\", line 22, in funct\\n',\n ' return args[0].__call__(*args[1:], **kwargs)\\n',\n ' ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^\\n',\n ' File \"/tmp/ipykernel_463/3167739528.py\", line 3, in my_funct\\n',\n 'ValueError\\n']"},"metadata":{}}],"execution_count":42},{"id":"5aff120a-317f-47d4-9639-8eccfb136117","cell_type":"code","source":"import os\n\nif os.path.exists(\"error.out\"):\n os.remove(\"error.out\")","metadata":{"trusted":true},"outputs":[],"execution_count":43},{"id":"44cd41d7-4417-429e-8481-f2a49e5f769c","cell_type":"markdown","source":"### TestClusterExecutor\nWhile the `SingleNodeExecutor` internally behaves very similar to the `FluxJobExecutor` and `SlurmJobExecutor` the `FluxClusterExecutor` and `SlurmClusterExecutor` behave very different as they use the file system to exchange information rather than socket-based communication. This can lead to complications when it comes to debugging. To address this challenge executorlib provides the `TestClusterExecutor` which can be executed on a local workstation just like the `SingleNodeExecutor` but in the background it uses the same file based communication like the `SlurmClusterExecutor` and the `FluxClusterExecutor`:","metadata":{}},{"id":"2ae29fb0-54e5-468f-8727-a179b6ed363e","cell_type":"code","source":"from executorlib.api import TestClusterExecutor\n\nwith TestClusterExecutor(cache_directory=\"test\") as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":44}]} \ No newline at end of file diff --git a/notebooks/2-hpc-cluster.ipynb b/notebooks/2-hpc-cluster.ipynb index d45cb9f58..b972f5d96 100644 --- a/notebooks/2-hpc-cluster.ipynb +++ b/notebooks/2-hpc-cluster.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.13.13","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. On shared HPC systems users cannot access compute nodes directly — SLURM acts as the resource controller, accepting job requests, managing the queue, and assigning work to nodes when resources become free.\n\nIn the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command, this is in contrast to the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) which internally uses the [srun](https://slurm.schedmd.com/srun.html) command.\n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"slurm-background-001","cell_type":"markdown","source":"### Background\n\nThree commands cover the day-to-day SLURM workflow:\n\n**`sbatch`** submits a batch script from the login node. The script carries resource directives on lines starting with `#SBATCH`:\n\n```bash\n#!/bin/bash\n#SBATCH --job-name=my_job\n#SBATCH --output=my_job.out\n#SBATCH --ntasks=4 # total MPI ranks\n#SBATCH --cpus-per-task=1 # CPU threads per rank\n#SBATCH --time=00:30:00 # wall-clock limit (HH:MM:SS)\n#SBATCH --partition=regular\n\nsrun --mpi=pmix python my_script.py\n```\n\n```bash\nsbatch job.sh # submit — returns a job ID immediately\n```\n\nKey `#SBATCH` directives:\n\n| Directive | Meaning |\n|---|---|\n| `--ntasks=N` | Total MPI ranks (processes) |\n| `--cpus-per-task=N` | CPU threads available to each rank |\n| `--mem=NG` | Memory per node (e.g. `8G`) |\n| `--time=HH:MM:SS` | Maximum wall-clock time |\n| `--partition=name` | Queue / partition to target |\n| `--dependency=afterok:JOBID` | Run only after another job succeeds |\n\n**`srun`** launches parallel tasks *inside* an existing allocation. Multiple `srun` calls can run concurrently using shell backgrounding:\n\n```bash\nsrun --mpi=pmix -n 4 python task_a.py &\nsrun --mpi=pmix -n 4 python task_b.py &\nwait # block until both finish\n```\n\n**`squeue`** and **`sacct`** let you inspect the queue and verify resource assignments:\n\n```bash\nsqueue --me # your running/pending jobs\nsacct -j 12345 --format=JobID,State,AllocCPUS,Elapsed # accounting for job 12345\n```\n\nCommon `squeue` state codes: `PD` (pending), `R` (running), `CG` (completing).","metadata":{}},{"id":"slurm-mpi-001","cell_type":"markdown","source":"### MPI-parallel Python\n\nThe [Message Passing Interface (MPI)](https://www.mpi-forum.org) is the dominant parallelisation standard on HPC systems. [`mpi4py`](https://mpi4py.readthedocs.io) provides Python bindings. A minimal example:\n\n```python\n# script.py\nfrom mpi4py import MPI\ncomm = MPI.COMM_WORLD\nprint(f\"rank {comm.Get_rank()} of {comm.Get_size()}\")\n```\n\n```bash\nsrun --mpi=pmix -n 4 python script.py\n```\n\nWhen multiple independent groups of ranks need to run inside one allocation there are two approaches:\n\n| Approach | How | Cross-group communication |\n|---|---|---|\n| Multiple `srun` calls | Each `srun` gets its own communicator | Not possible |\n| `MPI_Comm_split` | One `srun`, split in Python | Possible via `MPI.COMM_WORLD` |\n\n```python\n# communicator splitting — 8 ranks split into two groups of 4\ncomm = MPI.COMM_WORLD\ncolor = comm.Get_rank() // 4 # group 0 or group 1\nsub_comm = comm.Split(color)\n```","metadata":{}},{"id":"db7760e8-35a6-4a1c-8b0f-410b536c3835","cell_type":"markdown","source":"### SlurmClusterExecutor\n\n```python\nfrom executorlib import SlurmClusterExecutor\n```","metadata":{}},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --ntasks={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n \"partition\": \"s.cmfe\",\n })\n print(future.result())\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"The template uses [Jinja2](https://jinja.palletsprojects.com) syntax. executorlib fills `{{cores}}` into `--ntasks`, so `cores=1` requests one serial process and `cores=4` requests four MPI ranks. With `pmi_mode=\"pmix\"` the executor additionally wraps the function call in `srun --mpi=pmix -n `:\n\n```python\ndef mpi_calc(i):\n from mpi4py import MPI\n comm = MPI.COMM_WORLD\n return {\"rank\": comm.Get_rank(), \"size\": comm.Get_size(), \"input\": i}\n\nwith SlurmClusterExecutor(pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n mpi_calc, 42,\n resource_dict={\"cores\": 4, \"partition\": \"regular\", \"run_time_max\": 120})\n print(future.result())\n```\n\nWith these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ","metadata":{}},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\n[Flux](http://flux-framework.org) is a modern HPC resource manager developed at Lawrence Livermore National Laboratory (LLNL). On many systems it runs as a **secondary scheduler inside a SLURM allocation**, enabling fine-grained hierarchical task distribution. Unlike SLURM, Flux can also be installed locally via conda — making it especially suitable for demonstrations, testing, and continuous integration:\n\n```bash\nconda install -c conda-forge flux-core\nflux start # launch a local Flux instance\n```\n\nThis simple installation is explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). The features demonstrated below using Flux apply equally to SLURM.","metadata":{}},{"id":"flux-background-001","cell_type":"markdown","source":"### Background\n\nThe key Flux commands map closely onto their SLURM equivalents:\n\n| Flux command | SLURM equivalent | Description |\n|---|---|---|\n| `flux resource list` | `sinfo` | Show available nodes, cores, and GPUs |\n| `flux jobs -a` | `squeue` | List all jobs (running and completed) |\n| `flux submit` | `sbatch` | Submit a non-blocking job; returns a job ID immediately |\n| `flux run` | `srun` | Launch a blocking job; waits for completion |\n| `flux job attach ` | — | Stream output of a previously submitted job |\n\n```bash\n# submit a non-blocking 4-rank MPI job\nflux submit -o pmi=pmix --ntasks=4 python script.py\n\n# run a blocking 4-rank MPI job (waits for output)\nflux run -o pmi=pmix -n 4 python script.py\n```\n\nThe `-o pmi=pmix` flag matches what SLURM's `--mpi=pmix` provides — the same `mpi4py` scripts run unchanged under both schedulers.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"0f7fc37a-1248-492d-91ab-9db1d737eaee","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"ae308683-6083-4e78-afc2-bff6c6dc297b","cell_type":"code","source":"from executorlib import FluxClusterExecutor\n\nwith FluxClusterExecutor(cache_directory=\"./file\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"22\n"}],"execution_count":2},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.","metadata":{}},{"id":"eded3a0f-e54f-44f6-962f-eedde4bd2158","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n","metadata":{"trusted":true},"outputs":[],"execution_count":3},{"id":"669b05df-3cb2-4f69-9d94-8b2442745ebb","cell_type":"code","source":"with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"(3, 1, 0)\n"}],"execution_count":4},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"slurm-job-executor-001","cell_type":"markdown","source":"## Combine both\n\nWhile `SlurmClusterExecutor` submits each Python function as a separate `sbatch` job from the login node, the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) (`SlurmJobExecutor`) runs **inside** an already-running SLURM allocation and dispatches tasks as `srun` steps — no new jobs enter the queue.\n\n```python\nfrom executorlib import SlurmJobExecutor\n\n# This code runs inside an existing SLURM job\nwith SlurmJobExecutor(max_workers=4) as exe:\n futures = [exe.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(4)]\n print([f.result() for f in futures])\n```\n\nThe two executor types can be nested: a function submitted via `SlurmClusterExecutor` (running as an `sbatch` job) can itself create a `SlurmJobExecutor` to parallelise sub-tasks as `srun` steps within the same allocation:\n\n```python\nfrom executorlib import SlurmClusterExecutor, SlurmJobExecutor\n\ndef parallel_workflow(n):\n with SlurmJobExecutor(max_workers=n) as inner:\n futures = [inner.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(n)]\n return [f.result() for f in futures]\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as outer:\n future = outer.submit(\n parallel_workflow, 4,\n resource_dict={\"cores\": 1, \"partition\": \"regular\", \"run_time_max\": 300})\n print(future.result())\n```\n\nThe `sacct` output for such a job will show the outer `sbatch` job together with numbered `srun` steps (e.g. `12345.0`, `12345.1`, …), all completing within the same allocation.\n\n| Executor | Scheduler command | Typical use |\n|---|---|---|\n| `SlurmClusterExecutor` | `sbatch` (one job per function) | Submit from login node |\n| `SlurmJobExecutor` | `srun` steps within current job | Inside an existing allocation |\n| `FluxClusterExecutor` | `flux submit` | Flux-managed allocation or local testing |","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"## Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"f537b4f6-cc98-43da-8aca-94a823bcbcbd","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['add_functcb272924f36cbaa9ac79f8d42b6771c8', 'add_funct67b8245bf71c3c6dcb2018663939c72d', 'add_funct144ecf19b5020fccad214df3f4bdabd0', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667', 'add_funct67b8245bf71c3c6dcb2018663939c72d_o.h5', 'add_funct144ecf19b5020fccad214df3f4bdabd0_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667_o.h5', 'add_functcb272924f36cbaa9ac79f8d42b6771c8_o.h5']\n"}],"execution_count":5},{"id":"3efc9f5d-fbf9-4a85-8963-5711a453130d","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file +{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.13.13","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. On shared HPC systems users cannot access compute nodes directly — SLURM acts as the resource controller, accepting job requests, managing the queue, and assigning work to nodes when resources become free.\n\nIn the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command, this is in contrast to the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) which internally uses the [srun](https://slurm.schedmd.com/srun.html) command.\n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"slurm-background-001","cell_type":"markdown","source":"### Background\n\nThree commands cover the day-to-day SLURM workflow:\n\n**`sbatch`** submits a batch script from the login node. The script carries resource directives on lines starting with `#SBATCH`:\n\n```bash\n#!/bin/bash\n#SBATCH --job-name=my_job\n#SBATCH --output=my_job.out\n#SBATCH --ntasks=4 # total MPI ranks\n#SBATCH --cpus-per-task=1 # CPU threads per rank\n#SBATCH --time=00:30:00 # wall-clock limit (HH:MM:SS)\n#SBATCH --partition=regular\n\nsrun --mpi=pmix python my_script.py\n```\n\n```bash\nsbatch job.sh # submit — returns a job ID immediately\n```\n\nKey `#SBATCH` directives:\n\n| Directive | Meaning |\n|---|---|\n| `--ntasks=N` | Total MPI ranks (processes) |\n| `--cpus-per-task=N` | CPU threads available to each rank |\n| `--mem=NG` | Memory per node (e.g. `8G`) |\n| `--time=HH:MM:SS` | Maximum wall-clock time |\n| `--partition=name` | Queue / partition to target |\n| `--dependency=afterok:JOBID` | Run only after another job succeeds |\n\n**`srun`** launches parallel tasks *inside* an existing allocation. Multiple `srun` calls can run concurrently using shell backgrounding:\n\n```bash\nsrun --mpi=pmix -n 4 python task_a.py &\nsrun --mpi=pmix -n 4 python task_b.py &\nwait # block until both finish\n```\n\n**`squeue`** and **`sacct`** let you inspect the queue and verify resource assignments:\n\n```bash\nsqueue --me # your running/pending jobs\nsacct -j 12345 --format=JobID,State,AllocCPUS,Elapsed # accounting for job 12345\n```\n\nCommon `squeue` state codes: `PD` (pending), `R` (running), `CG` (completing).","metadata":{}},{"id":"slurm-mpi-001","cell_type":"markdown","source":"### MPI-parallel Python\n\nThe [Message Passing Interface (MPI)](https://www.mpi-forum.org) is the dominant parallelisation standard on HPC systems. [`mpi4py`](https://mpi4py.readthedocs.io) provides Python bindings. A minimal example:\n\n```python\n# script.py\nfrom mpi4py import MPI\ncomm = MPI.COMM_WORLD\nprint(f\"rank {comm.Get_rank()} of {comm.Get_size()}\")\n```\n\n```bash\nsrun --mpi=pmix -n 4 python script.py\n```\n\nWhen multiple independent groups of ranks need to run inside one allocation there are two approaches:\n\n| Approach | How | Cross-group communication |\n|---|---|---|\n| Multiple `srun` calls | Each `srun` gets its own communicator | Not possible |\n| `MPI_Comm_split` | One `srun`, split in Python | Possible via `MPI.COMM_WORLD` |\n\n```python\n# communicator splitting — 8 ranks split into two groups of 4\ncomm = MPI.COMM_WORLD\ncolor = comm.Get_rank() // 4 # group 0 or group 1\nsub_comm = comm.Split(color)\n```","metadata":{}},{"id":"db7760e8-35a6-4a1c-8b0f-410b536c3835","cell_type":"markdown","source":"### SlurmClusterExecutor\n\n```python\nfrom executorlib import SlurmClusterExecutor\n```","metadata":{}},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --ntasks={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n \"partition\": \"s.cmfe\",\n })\n print(future.result())\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"The template uses [Jinja2](https://jinja.palletsprojects.com) syntax. executorlib fills `{{cores}}` into `--ntasks`, so `cores=1` requests one serial process and `cores=4` requests four MPI ranks. With `pmi_mode=\"pmix\"` the executor additionally wraps the function call in `srun --mpi=pmix -n `:\n\n```python\ndef mpi_calc(i):\n from mpi4py import MPI\n comm = MPI.COMM_WORLD\n return {\"rank\": comm.Get_rank(), \"size\": comm.Get_size(), \"input\": i}\n\nwith SlurmClusterExecutor(pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n mpi_calc, 42,\n resource_dict={\"cores\": 4, \"partition\": \"regular\", \"run_time_max\": 120})\n print(future.result())\n```\n\nWith these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ","metadata":{}},{"cell_type":"markdown","id":"e1299068-990e-4fd3-b926-f6415b4e9d75","metadata":{},"source":["### Verifying the Resource Assignment\n","After the submission it is often useful to confirm that the job scheduler actually assigned the requested resources. For the `SlurmClusterExecutor` the SLURM job identifier is stored in the cache and can be retrieved with the `get_cache_data()` function. This job identifier `queue_id` can then be passed to the SLURM [sacct](https://slurm.schedmd.com/sacct.html) command to inspect the accounting record of the job:\n","\n","```python\n","from executorlib import get_cache_data\n","import subprocess\n","\n","for entry in get_cache_data(cache_directory=\"./cache\"):\n"," if \"calc\" in str(entry[\"function\"]):\n"," job_id = entry[\"queue_id\"]\n"," print(subprocess.check_output(\n"," [\"sacct\", \"-j\", str(job_id), \"--format=JobID,State,AllocCPUS,Elapsed\"],\n"," universal_newlines=True,\n"," ))\n","```\n","\n","The `AllocCPUS` column reports the number of CPU cores SLURM allocated for the function. In addition to the `queue_id` each cache entry also contains the full `resource_dict`, the runtime and the path of the result file, which together provide a complete audit trail of the submission - this is the same information returned by the `get_cache_data()` function demonstrated for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache)."]},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\n[Flux](http://flux-framework.org) is a modern HPC resource manager developed at Lawrence Livermore National Laboratory (LLNL). On many systems it runs as a **secondary scheduler inside a SLURM allocation**, enabling fine-grained hierarchical task distribution. Unlike SLURM, Flux can also be installed locally via conda — making it especially suitable for demonstrations, testing, and continuous integration:\n\n```bash\nconda install -c conda-forge flux-core\nflux start # launch a local Flux instance\n```\n\nThis simple installation is explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). The features demonstrated below using Flux apply equally to SLURM.","metadata":{}},{"id":"flux-background-001","cell_type":"markdown","source":"### Background\n\nThe key Flux commands map closely onto their SLURM equivalents:\n\n| Flux command | SLURM equivalent | Description |\n|---|---|---|\n| `flux resource list` | `sinfo` | Show available nodes, cores, and GPUs |\n| `flux jobs -a` | `squeue` | List all jobs (running and completed) |\n| `flux submit` | `sbatch` | Submit a non-blocking job; returns a job ID immediately |\n| `flux run` | `srun` | Launch a blocking job; waits for completion |\n| `flux job attach ` | — | Stream output of a previously submitted job |\n\n```bash\n# submit a non-blocking 4-rank MPI job\nflux submit -o pmi=pmix --ntasks=4 python script.py\n\n# run a blocking 4-rank MPI job (waits for output)\nflux run -o pmi=pmix -n 4 python script.py\n```\n\nThe `-o pmi=pmix` flag matches what SLURM's `--mpi=pmix` provides — the same `mpi4py` scripts run unchanged under both schedulers.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"0f7fc37a-1248-492d-91ab-9db1d737eaee","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"ae308683-6083-4e78-afc2-bff6c6dc297b","cell_type":"code","source":"from executorlib import FluxClusterExecutor\n\nwith FluxClusterExecutor(cache_directory=\"./file\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"22\n"}],"execution_count":2},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.","metadata":{}},{"id":"eded3a0f-e54f-44f6-962f-eedde4bd2158","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n","metadata":{"trusted":true},"outputs":[],"execution_count":3},{"id":"669b05df-3cb2-4f69-9d94-8b2442745ebb","cell_type":"code","source":"with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"(3, 1, 0)\n"}],"execution_count":4},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"cell_type":"markdown","id":"c9c46c64-d097-49d3-ba79-ecb871c8f4da","metadata":{},"source":["## Disconnecting and Reconnecting\n","A key advantage of the HPC Cluster Executors over the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) is that the Python process which created the executor does not need to stay alive while the submitted functions are running. As the functions are submitted as individual scheduler jobs and the results are stored on the file system, the Python process can be closed after the submission - for example to log out of the login node overnight - and the results can be reloaded later. This is controlled with the `shutdown()` method of the executor, which provides the same `wait` and `cancel_futures` parameters as the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown) of the Python standard library.\n","\n","To submit a set of functions and disconnect without waiting, the executor is created without a `with` statement and `shutdown()` is called with `wait=False` and `cancel_futures=False`:\n","\n","```python\n","from executorlib import SlurmClusterExecutor\n","\n","exe = SlurmClusterExecutor(cache_directory=\"./cache\")\n","future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n","exe.shutdown(wait=False, cancel_futures=False)\n","```\n","\n","The submitted jobs remain in the queue of the job scheduler and continue to run. At a later point - even from a new Python process - the same functions are submitted again using the same `cache_directory`. executorlib recognises the cached results and returns them immediately instead of submitting the functions a second time:\n","\n","```python\n","from executorlib import SlurmClusterExecutor\n","\n","with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n"," future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n"," print([f.result() for f in future_lst])\n","```\n","\n","The behaviour of the `shutdown()` method is summarized in the following table:\n","\n","| `shutdown()` call | Effect |\n","|---|---|\n","| `shutdown(wait=True)` | Wait until all submitted functions are finished before continuing - this is the default and also what the `with` statement does. |\n","| `shutdown(wait=False, cancel_futures=False)` | Return immediately and leave the submitted jobs running in the scheduler queue - used to disconnect from a running workflow. |\n","| `shutdown(wait=False, cancel_futures=True)` | Cancel the submitted jobs which have not started or finished yet. |\n","\n","This disconnect-and-reconnect capability is available for both the `SlurmClusterExecutor` and the `FluxClusterExecutor`, as both communicate via the file system rather than via sockets."]},{"id":"slurm-job-executor-001","cell_type":"markdown","source":"## Combine both\n\nWhile `SlurmClusterExecutor` submits each Python function as a separate `sbatch` job from the login node, the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) (`SlurmJobExecutor`) runs **inside** an already-running SLURM allocation and dispatches tasks as `srun` steps — no new jobs enter the queue.\n\n```python\nfrom executorlib import SlurmJobExecutor\n\n# This code runs inside an existing SLURM job\nwith SlurmJobExecutor(max_workers=4) as exe:\n futures = [exe.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(4)]\n print([f.result() for f in futures])\n```\n\nThe two executor types can be nested: a function submitted via `SlurmClusterExecutor` (running as an `sbatch` job) can itself create a `SlurmJobExecutor` to parallelise sub-tasks as `srun` steps within the same allocation:\n\n```python\nfrom executorlib import SlurmClusterExecutor, SlurmJobExecutor\n\ndef parallel_workflow(n):\n with SlurmJobExecutor(max_workers=n) as inner:\n futures = [inner.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(n)]\n return [f.result() for f in futures]\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as outer:\n future = outer.submit(\n parallel_workflow, 4,\n resource_dict={\"cores\": 1, \"partition\": \"regular\", \"run_time_max\": 300})\n print(future.result())\n```\n\nThe `sacct` output for such a job will show the outer `sbatch` job together with numbered `srun` steps (e.g. `12345.0`, `12345.1`, …), all completing within the same allocation.\n\n| Executor | Scheduler command | Typical use |\n|---|---|---|\n| `SlurmClusterExecutor` | `sbatch` (one job per function) | Submit from login node |\n| `SlurmJobExecutor` | `srun` steps within current job | Inside an existing allocation |\n| `FluxClusterExecutor` | `flux submit` | Flux-managed allocation or local testing |","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"## Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"f537b4f6-cc98-43da-8aca-94a823bcbcbd","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['add_functcb272924f36cbaa9ac79f8d42b6771c8', 'add_funct67b8245bf71c3c6dcb2018663939c72d', 'add_funct144ecf19b5020fccad214df3f4bdabd0', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667', 'add_funct67b8245bf71c3c6dcb2018663939c72d_o.h5', 'add_funct144ecf19b5020fccad214df3f4bdabd0_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667_o.h5', 'add_functcb272924f36cbaa9ac79f8d42b6771c8_o.h5']\n"}],"execution_count":5},{"id":"3efc9f5d-fbf9-4a85-8963-5711a453130d","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file diff --git a/notebooks/3-hpc-job.ipynb b/notebooks/3-hpc-job.ipynb index 210771799..7985423df 100644 --- a/notebooks/3-hpc-job.ipynb +++ b/notebooks/3-hpc-job.ipynb @@ -64,6 +64,33 @@ "In this Python script `` the `\"flux_allocation\"` backend can be used." ] }, + { + "cell_type": "markdown", + "id": "f58c020a-f78a-46ee-818c-0ba1c57c873d", + "metadata": {}, + "source": [ + "When the Python functions are submitted from the login node with the [SlurmClusterExecutor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#slurm), a [flux](https://flux-framework.org/) instance can also be started automatically for each submitted job, rather than wrapping the script manually. This is achieved by wrapping the `{{command}}` of the submission template with `srun flux start`, so each `sbatch` job boots its own flux instance:\n", + "\n", + "```python\n", + "submission_template = \"\"\"\\\n", + "#!/bin/bash\n", + "#SBATCH --output=time.out\n", + "#SBATCH --job-name={{job_name}}\n", + "#SBATCH --chdir={{working_directory}}\n", + "#SBATCH --get-user-env=L\n", + "#SBATCH --partition={{partition}}\n", + "{%- if run_time_max %}\n", + "#SBATCH --time={{ [1, run_time_max // 60]|max }}\n", + "{%- endif %}\n", + "#SBATCH --cpus-per-task={{cores}}\n", + "\n", + "srun --cpus-per-task={{cores}} flux start {{command}}\n", + "\"\"\"\n", + "```\n", + "\n", + "Inside the function which is submitted to the `SlurmClusterExecutor` a nested `FluxJobExecutor` can then distribute many small tasks within the flux instance of the allocation. This combines the robustness of the file based submission of the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) with the low overhead of the socket based execution of the HPC Job Executor, and is the recommended setup to distribute a large number of tasks within a single SLURM allocation - see the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#flux-framework-as-secondary-scheduler)." + ] + }, { "cell_type": "markdown", "id": "68be70c3-af18-4165-862d-7022d35bf9e4",