Code Monkey home page Code Monkey logo

pympipool's Introduction

pympipool

Unittests Coverage Status Binder

Challenges

In high performance computing (HPC) the Python programming language is commonly used as high-level language to orchestrate the coupling of scientific applications. Still the efficient usage of highly parallel HPC clusters remains challenging, in primarily three aspects:

  • Communication: Distributing python function calls over hundreds of compute node and gathering the results on a shared file system is technically possible, but highly inefficient. A socket-based communication approach is preferable.
  • Resource Management: Assigning Python functions to GPUs or executing Python functions on multiple CPUs using the message passing interface (MPI) requires major modifications to the python workflow.
  • Integration: Existing workflow libraries implement a secondary the job management on the Python level rather than leveraging the existing infrastructure provided by the job scheduler of the HPC.

pympipool is ...

In a given HPC allocation the pympipool library addresses these challenges by extending the Executor interface of the standard Python library to support the resource assignment in the HPC context. Computing resources can either be assigned on a per function call basis or as a block allocation on a per Executor basis. The pympipool library is built on top of the flux-framework to enable fine-grained resource assignment. In addition, Simple Linux Utility for Resource Management (SLURM) is supported as alternative queuing system and for workstation installations pympipool can be installed without a job scheduler.

pympipool is not ...

The pympipool library is not designed to request an allocation from the job scheduler of an HPC. Instead within a given allocation from the job scheduler the pympipool library can be employed to distribute a series of python function calls over the available computing resources to achieve maximum computing resource utilization.

Example

The following examples illustrates how pympipool can be used to distribute a series of MPI parallel function calls within a queuing system allocation. example.py:

import flux.job
from pympipool import Executor

def calc(i):
    from mpi4py import MPI
    size = MPI.COMM_WORLD.Get_size()
    rank = MPI.COMM_WORLD.Get_rank()
    return i, size, rank

with flux.job.FluxExecutor() as flux_exe:
    with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe:
        fs = exe.submit(calc, 3)
        print(fs.result())

This example can be executed using:

python example.py

Which returns:

>>> [(0, 2, 0), (0, 2, 1)], [(1, 2, 0), (1, 2, 1)]

The important part in this example is that mpi4py is only used in the calc() function, not in the python script, consequently it is not necessary to call the script with mpiexec but instead a call with the regular python interpreter is sufficient. This highlights how pympipool allows the users to parallelize one function at a time and not having to convert their whole workflow to use mpi4py. The same code can also be executed inside a jupyter notebook directly which enables an interactive development process.

The interface of the standard concurrent.futures.Executor is extended by adding the option cores_per_worker=2 to assign multiple MPI ranks to each function call. To create two workers the maximum number of cores can be increased to max_cores=4. In this case each worker receives two cores resulting in a total of four CPU cores being utilized.

After submitting the function calc() with the corresponding parameter to the executor exe.submit(calc, 0) a python concurrent.futures.Future is returned. Consequently, the pympipool.Executor can be used as a drop-in replacement for the concurrent.futures.Executor which allows the user to add parallelism to their workflow one function at a time.

Disclaimer

While we try to develop a stable and reliable software library, the development remains a opensource project under the BSD 3-Clause License without any warranties::

BSD 3-Clause License

Copyright (c) 2022, Jan Janssen
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
  contributors may be used to endorse or promote products derived from
  this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Documentation

pympipool's People

Contributors

dependabot[bot] avatar jameshcorbett avatar jan-janssen avatar liamhuber avatar pre-commit-ci[bot] avatar pyiron-runner avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

pympipool's Issues

[feature] openmp support

In addition to CPU parallelisation and GPU support it would be great to add OpenMP parallelisation.

[feature] dynamic management of executors

Use the mpi4py interface to dynamically assign and remove executors in a larger network with multiple executors. Currently, the user would have to manage the network of executors on their own, but having the management part inside the MPI parallel process could be beneficial.

[feature] Add support for OpenMP

Currently, only MPI is used for parallelization, but in principle it should also be possible to use OpenMP in addition to MPI.

[bug] Spurious CPU usage with PoolExecutor

Using PoolExecutor incurs strangely high CPU usage. The MWE below shows 40% cpu on both the process that runs it as well as the mpipool.py process spawned in the background. After the futures are ready the main process blocks a full core until the executor shuts down. It sounds as if processes are spinning on the ZMQ socket or there are extra messages sent back and forth all the time, because I can make the CPU usage go down while the futures are running by adding sleeps or prints into the main loop of mpipool.py. CPU usage remains high until the executor shuts down though even in that case. I haven't figured out yet how to monitor the ZMQ socket though, so I cannot confirm that hypothesis.

Using the ProcessPoolExecutor from the standard library this does not happen.

import time
from pympipool import PoolExecutor

def calc(n):
    time.sleep(30)
    return n**2

exe = PoolExecutor(max_workers=4)
f1 = exe.submit(calc, 42)
f2 = exe.submit(calc, 84)
f1.add_done_callback(print)
f2.add_done_callback(print)
time.sleep(60)  # workaround #94 
exe.shutdown(wait=True)

[feature] For testing it would be great to have an executor which support tasks with more then one core

Concept:

from concurrent.futures import Executor, Future, ProcessPoolExecutor
import queue
from threading import Thread
from time import sleep

from pympipool.shared import cancel_items_in_queue


def get_callback(future_external):
    def callback(fn):
        if fn.cancelled():
            future_external.cancel()
        else:
            future_external.set_result(fn.result())
    return callback   


def refresh(running_dict, todo_lst, max_workers, exe):
    running_dict = {k: v for k, v in running_dict.items() if not k.done()}
    cores_available = max_workers - sum(running_dict.values())
    if len(todo_lst) > 0 and cores_available > todo_lst[0]["cores"]:
        task_dict = todo_lst.pop(0)
        fn_external = task_dict["future"]
        if fn_external.set_running_or_notify_cancel():
            running_dict[fn_external] = task_dict["cores"]
            fn_internal = exe.submit(task_dict["fn"], *task_dict["args"], **task_dict["kwargs"])
            fn_internal.add_done_callback(get_callback(future_external=fn_external))
        refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=max_workers, exe=exe)


def execute_task_dict(task_dict, todo_lst):
    if "fn" in task_dict.keys() or "future" in task_dict.keys():
        todo_lst.append(task_dict)
        return True
    elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
        return False
    else:
        raise ValueError("Unrecognized Task in task_dict: ", task_dict)


def background_execution(future_queue, exe_args, exe_kwargs, sleep_interval=0.1):
    todo_lst = []
    running_dict = {}
    with ProcessPoolExecutor(*exe_args, **exe_kwargs) as exe:
        while True:
            try:
                task_dict = future_queue.get_nowait()
            except queue.Empty:
                pass
            else:
                if execute_task_dict(task_dict=task_dict, todo_lst=todo_lst):
                    future_queue.task_done()
                else:
                    future_queue.task_done()
                    future_queue.join()
                    break
            refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=exe._max_workers, exe=exe)
            sleep(sleep_interval)


class MultiProcessPoolExecutor(Executor):
    def __init__(self, *args, **kwargs):
        self._future_queue = queue.Queue()
        self._process = Thread(
            target=background_execution,
            kwargs={
                "exe_args": args, 
                "exe_kwargs": kwargs,
                "future_queue": self._future_queue,
            },
        )
        self._process.start()

    def submit(self, fn, *args, cores=1, **kwargs):
        f = Future()
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f, "cores": cores})
        return f

    def shutdown(self, wait=True, *, cancel_futures=False):
        if cancel_futures:
            cancel_items_in_queue(que=self._future_queue)
        self._future_queue.put({"shutdown": True, "wait": wait})
        if wait:
            self._process.join()
            self._future_queue.join()
        self._process = None
        self._future_queue = None

Example:

from multi import MultiProcessPoolExecutor

def test_addition(a, b):
    return a + b


if __name__ == "__main__":
    with MultiProcessPoolExecutor() as exe:
        fn = exe.submit(test_addition, 1, 2)
        print(fn.result())

[feature] Dependencies between submitted functions

Example:

def add_function(parameter_1, parameter_2):
    return parameter_1 + parameter_2

with Executor(max_cores=1) as exe:
    future_1 = exe.submit(add_function, 1, parameter_2=2)
    future_2 = exe.submit(add_function, 1, parameter_2=future_1)
    print(future_2.result())

[bug] CPU load when idle

It works for a single core:

lmp = LammpsBase(
    cores=1,
    oversubscribe=False,
    enable_flux_backend=False,
    working_directory=".",
)

but calling:

lmp = LammpsBase(
    cores=2,
    oversubscribe=False,
    enable_flux_backend=False,
    working_directory=".",
)

results in one process with 100% CPU load.

Found by @pmrv

[testing] Expand tests to cover unpickleable stuff?

I would be interested in adding explicit verification that pympipool.Executor can handle un-pickle-able stuff. I have a test suite already here that (almost!) works out-of-the box when you plug in pympipool.Executor, so the simplest thing would be to shove those into the executor tests. However, maybe there is a more generic place to do this sort of thing like in the sockets where the cloudpickling is happening?

(The "almost" is just to do with how things get handled when an exception is raised, which is test for a spec I wrote over in contrib and doesn't need to be how we do it. I am also struggling that pycharm fails to shutdown the processes and finish the tests, so it's possible that this issue goes away entirely once PyCharm plays nicely with Executor.)

[bug] Mpich error message

===================================================================================
=   BAD TERMINATION OF ONE OF YOUR APPLICATION PROCESSES
=   PID 4821 RUNNING AT fv-az557-506
=   EXIT CODE: 15
=   CLEANING UP REMAINING PROCESSES
=   YOU CAN IGNORE THE BELOW CLEANUP MESSAGES
===================================================================================

Limits of nested function

This does not work:

from pympipool import Pool


def my_square(i):
    return i ** 2


def calc(i):
    import numpy as np
    return np.array(my_square(i=i))


with Pool(cores=2) as p:
    print(p.map(function=calc, lst=[1, 2, 3, 4]))

But this does:

from pympipool import Pool


def calc(i):
    import numpy as np

    def my_square(i):
        return i ** 2

    return np.array(my_square(i=i))


with Pool(cores=2) as p:
    print(p.map(function=calc, lst=[1, 2, 3, 4]))

Output truncation

When running larger calculation, like:

import numpy as np
import pandas
from tqdm import tqdm
from ase.build import bulk
from pympipool import Pool


def body_order(n=32, b=5):
    if b == 2:
        return [[i ,n-i] for i in range(n+1)]
    else:
        lst = []
        for i in range(n+1):
            for j in body_order(n=n-i, b=b-1):
                lst.append([i] + j)
        return lst


def get_elastic_constants(input_para):
    i, conc_lst, number_atoms, element_lst, potential, structure = input_para

    # import
    import numpy as np
    import pyiron_contrib
    from pyiron_atomistics import Project, ase_to_pyiron

    # create project
    project = Project('workflow')

    # Generate structure
    if sum(np.array(conc_lst)==0) != 4:
        mole_fraction = {
            el: conc/number_atoms
            for el, conc in zip(element_lst, conc_lst)
            if conc > 0
        }
        job = project.create_job(project.job_type.SQSJobWithoutOutput, "sqs_" + str(i))
        job._interactive_disable_log_file = True
        job.structure = ase_to_pyiron(structure)
        job.input['mole_fractions'] = mole_fraction
        job.input['iterations'] = 1e6
        job.server.cores = 1
        job.run()
        structure_next = job._lst_of_struct[-1]
    else:
        structure_next = ase_to_pyiron(structure).copy()
        structure_next.symbols[:] = np.array(element_lst)[np.array(conc_lst)!=0][0]

    # Minimize strucute
    lmp_mini1 = project.create_job(project.job_type.LammpsInteractiveWithoutOutput, "lmp_mini_" + str(i), delete_existing_job=True)
    lmp_mini1.structure = structure_next
    lmp_mini1.potential = potential
    lmp_mini1.calc_minimize(pressure=0.0)
    lmp_mini1.server.run_mode.interactive = True
    lmp_mini1.interactive_mpi_communicator = MPI.COMM_SELF
    lmp_mini1._interactive_disable_log_file = True  # disable lammps.log
    lmp_mini1.run()
    lmp_mini1.interactive_close()

    # Elastic constants
    lmp_elastic = project.create_job(project.job_type.LammpsInteractiveWithoutOutput, "lmp_elastic_" + str(i), delete_existing_job=True)
    lmp_elastic.structure = lmp_mini1.get_structure()
    lmp_elastic.potential = potential
    lmp_elastic.interactive_enforce_structure_reset = True
    lmp_elastic.interactive_mpi_communicator = MPI.COMM_SELF
    lmp_elastic.server.run_mode.interactive = True
    lmp_elastic._interactive_disable_log_file = True  # disable lammps.log
    elastic = lmp_elastic.create_job(project.job_type.ElasticMatrixJobWithoutFiles, "elastic_" + str(i), delete_existing_job=True)
    elastic._interactive_disable_log_file = True  # disable lammps.log
    elastic.run()

    elastic_constants_lst = [
        elastic._data["C"][0][0],
        elastic._data["C"][0][1],
        elastic._data["C"][0][2],
        elastic._data["C"][0][3],
        elastic._data["C"][0][4],
        elastic._data["C"][0][5],
        elastic._data["C"][1][1],
        elastic._data["C"][1][2],
        elastic._data["C"][1][3],
        elastic._data["C"][1][4],
        elastic._data["C"][1][5],
        elastic._data["C"][2][2],
        elastic._data["C"][2][3],
        elastic._data["C"][2][4],
        elastic._data["C"][2][5],
        elastic._data["C"][3][3],
        elastic._data["C"][3][4],
        elastic._data["C"][3][5],
        elastic._data["C"][4][4],
        elastic._data["C"][4][5],
        elastic._data["C"][5][5],
    ]

    if "Fe" in elastic.ref_job.structure.get_species_symbols():
        conc_Fe = sum(elastic.ref_job.structure.indices==elastic.ref_job.structure.get_species_symbols().tolist().index("Fe")) / len(elastic.ref_job.structure.indices)
    else:
        conc_Fe = 0.0
    if "Ni" in elastic.ref_job.structure.get_species_symbols():
        conc_Ni = sum(elastic.ref_job.structure.indices==elastic.ref_job.structure.get_species_symbols().tolist().index("Ni")) / len(elastic.ref_job.structure.indices)
    else:
        conc_Ni = 0.0
    if "Cr" in elastic.ref_job.structure.get_species_symbols():
        conc_Cr = sum(elastic.ref_job.structure.indices==elastic.ref_job.structure.get_species_symbols().tolist().index("Cr")) / len(elastic.ref_job.structure.indices)
    else:
        conc_Cr = 0.0
    if "Co" in elastic.ref_job.structure.get_species_symbols():
        conc_Co = sum(elastic.ref_job.structure.indices==elastic.ref_job.structure.get_species_symbols().tolist().index("Co")) / len(elastic.ref_job.structure.indices)
    else:
        conc_Co = 0.0
    if "Cu" in elastic.ref_job.structure.get_species_symbols():
        conc_Cu = sum(elastic.ref_job.structure.indices==elastic.ref_job.structure.get_species_symbols().tolist().index("Cu")) / len(elastic.ref_job.structure.indices)
    else:
        conc_Cu = 0.0

    conc_lst = [conc_Fe, conc_Ni, conc_Cr, conc_Co, conc_Cu]
    return None
    # return elastic_constants_lst + conc_lst


if __name__ == "__main__": 
    structure = bulk("Al", cubic=True).repeat([2,2,2])
    element_lst = ["Fe", "Ni", "Cr", "Co", "Cu"]
    number_atoms = len(structure)
    number_species = len(element_lst)
    potential = '2021--Deluigi-O-R--Fe-Ni-Cr-Co-Cu--LAMMPS--ipr1'

    lst_32 = body_order(n=number_atoms, b=number_species)[:120]
    input_para_lst = [
        [i, conc_lst, number_atoms, element_lst, potential, structure]
        for i, conc_lst in enumerate(lst_32)
    ]

    with Pool(cores=12) as p:
        print("Pool started")
        results = p.map(function=get_elastic_constants, lst=input_para_lst)
        print("Pool stoped")

Then I get the error message:

Traceback (most recent call last):
  File "/home/janssen/pyiron/projects/2023-02-14-elastic-mpi/elastic_mpi.py", line 137, in <module>
    results = p.map(function=get_elastic_constants, lst=input_para_lst)
  File "/home/janssen/miniforge3/lib/python3.9/site-packages/pympipool/__init__.py", line 46, in map
    output = self._receive()
  File "/home/janssen/miniforge3/lib/python3.9/site-packages/pympipool/__init__.py", line 59, in _receive
    output = dill.load(self._process.stdout)
  File "/home/janssen/miniforge3/lib/python3.9/site-packages/dill/_dill.py", line 272, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/home/janssen/miniforge3/lib/python3.9/site-packages/dill/_dill.py", line 419, in load
    obj = StockUnpickler.load(self)
_pickle.UnpicklingError: pickle data was truncated

[feature] Serialise executors

As the Executors in pympipool typically use zmq to connect the executor to the process which is executing the functions it should in principle be possible to serialise the executor - store the information of the executor - and then deserialise the same object again. Still it is not clear what happens to the messages already submitted and in addition it is also not clear if the server process is started by the executor or by the process executing the python functions.

[bug] Openmpi error message

===================================================================================
=   BAD TERMINATION OF ONE OF YOUR APPLICATION PROCESSES
=   PID 4827 RUNNING AT fv-az1256-113
=   EXIT CODE: 15
=   CLEANING UP REMAINING PROCESSES
=   YOU CAN IGNORE THE BELOW CLEANUP MESSAGES
===================================================================================
YOUR APPLICATION TERMINATED WITH THE EXIT STRING: Terminated (signal 15)
This typically refers to a problem with your application.
Please see the FAQ page for debugging suggestions

[feature] Simplify distributing tasks over a number of GPUs in a cluster

Current solutions like dask require the user to either start each worker manually:

# If we have four GPUs on one machine
CUDA_VISIBLE_DEVICES=0 dask-worker ...
CUDA_VISIBLE_DEVICES=1 dask-worker ...
CUDA_VISIBLE_DEVICES=2 dask-worker ...
CUDA_VISIBLE_DEVICES=3 dask-worker ...

https://docs.dask.org/en/stable/gpu.html#specifying-gpus-per-machine or use a third party package like https://github.com/rapidsai/dask-cuda . With the flux backend and the recent changes #125 pympipool allows creating one Executor per GPU and then manually distribute the tasks one per GPU Executor. But managing a number of executors is still more complicated than it has to be.

[bug] PoolExecutor.shutdown(wait=True) doesn't actually wait

When queuing some functions and then (attempting) to wait on them with the shutdown function, the executor doesn't actually wait until all futures are ready. MWE is below. Doing it inside a context manager made no difference. When adding an additional time.sleep before shutting down the done callbacks do fire, so it sounds as if it's just the executor is not checking whether there are still outstanding futures.

import time
from pympipool import PoolExecutor

def calc(n):
    time.sleep(5)
    return n**2

exe = PoolExecutor(max_workers=4)
f1 = exe.submit(calc, 42)
f2 = exe.submit(calc, 84)
f1.add_done_callback(print)
f2.add_done_callback(print)
exe.shutdown(wait=True)

[bug] not playing well with discovering unittests

Currently all the unit tests use the paradigm cd tests; python -m unittest discover ., however a perfectly reasonable alternative is python -m unittest discover tests. However, the latter fails with error messages like

Traceback (most recent call last):
  File "/home/runner/work/pympipool/pympipool/pympipool/backend/mpiexec.py", line 90, in <module>
    main()
  File "/home/runner/work/pympipool/pympipool/pympipool/backend/mpiexec.py", line 46, in main
    input_dict = interface_receive(socket=socket)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/share/miniconda3/envs/test/lib/python3.11/site-packages/pympipool/shared/communication.py", line 147, in interface_receive
    return cloudpickle.loads(socket.recv())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'test_meta'
Error: The action has timed out.

As can be seen in #238 Unittests_by_path.

To the best of my understanding, this is a result of these lines in the backend:

# required for flux interface - otherwise the current path is not included in the python path
cwd = abspath(".")
if cwd not in sys.path:
sys.path.insert(1, cwd)

Since in the latter case "." is the parent path from which discover is invoked, and not the path where the actual test files reside.

This can be fixed by adding the test directory(ies) to the path (also demonstrated in #238 Unittests_by_path_with_env and Unittests_by_path_with_pythonpath which both run fine), but it still seems to me like something that should be fixed at the source rather than patched over in github workflows. In particular, I worry that if unittest discover has this sort of problem, other ways users invoke the executors might be similarly bugged.

It's not clear to me whether or not there is a good solution to this problem.

Aside: the comment in the snippet above says this is necessary for flux, but if this is really the relevant line for this issue that would imply it is important for all the executors, or?

Signature no longer conforms to Process/ThreadPoolExecutor

Both ProcessPoolExecutor and ThreadPoolExecutor take the init argument max_workers -- pympipool did too until recently, but now this is max_cores.

Are these actually fundamentally different? From my understanding these really are the same parameter, and while I feel that pympipool does indeed use the better name here, I would like to go back to max_workers for the sake of matching the standard library interface. Is that possible? The change seems to have been a tangential bit in a larger commit focused on something else.

[bug] Tests stuck

It seems sometimes the executor is not cleanly shutdown, resulting in the tests to hang even though all tests completed successfully.

Meta-data extraction

Hi - is there any way to get information such as the number of cores, number of threads, and number of gpus from the executor class? I'm imagining this would be useful for automated meta-data extraction/storage when you might have a few executors going at once.

[testing] Clean up Exception handling in threads

Python implements:

threading.excepthook(args, /)
Handle uncaught exception raised by [Thread.run()](https://docs.python.org/3/library/threading.html#threading.Thread.run).

The args argument has the following attributes:

exc_type: Exception type.
exc_value: Exception value, can be None.
exc_traceback: Exception traceback, can be None.
thread: Thread which raised the exception, can be None.
If exc_type is [SystemExit](https://docs.python.org/3/library/exceptions.html#SystemExit), the exception is silently ignored. Otherwise, the exception is printed out on [sys.stderr](https://docs.python.org/3/library/sys.html#sys.stderr).

If this function raises an exception, [sys.excepthook()](https://docs.python.org/3/library/sys.html#sys.excepthook) is called to handle it.

So most likely this is better than the current RaisingThread implementation.

[documentation] Compare available executors

Compare different kinds of executors to unify the interface:

Name Package Scaling Interface Comment
ThreadPoolExecutor standard library single node Python
ProcessPoolExecutor standard library single node Python fast on a single node
MPIPoolExecutor mpi4py MPI Python
FluxExecutor flux Flux Shell
Executor pysqa SLURM Python hdf5 support
PyFluxExecutor pympipool Flux Python
PyMPIExecutor pympipool MPI Python
PySlurmExecutor pympipool SLURM Python
SubprocessExecutor pympipool Subprocess Shell conda support

Open Questions:

  • Is there any SSH support for existing Executors? How to handle transfer of files on remote locations?
  • Can shell tasks and python tasks be combined in a single submission? With flux this should not be an issue and similarly with slurm.

[feature] Scaling for serial functions

At the moment the flux framework or SLURM workload manager are required to scale beyond a single node. We could in principle offer this for serial functions just based on mpi4py using the mpi4py.futures.MPIPoolExecutor. Still given the simple installation of flux-core via conda I do not think this is required any more.

[feature] control "submit"-level resource distribution

Hi @jan-janssen - is there a way to control the distribution of cores/threads/gpus at executor.submit() time (e.g. per-"job")? I was looking over the more recent versions and it seems like the interface has gone more the way of initializing the executors with this information - but I very likely could have missed something.

[testing] Compare to parsl

#!/bin/env python3

import os

import parsl
from parsl.app.app import python_app, bash_app
from parsl.config import Config
from parsl.channels import LocalChannel
from parsl.providers import SlurmProvider
from parsl.executors import FluxExecutor
from parsl.launchers import SimpleLauncher
from parsl.addresses import address_by_hostname
from parsl.data_provider.files import File

# Update to import config for your machine
config = Config(
    executors=[
        FluxExecutor(
            label="hera_flux",
#            launch_cmd='{flux} start -v 2 {python} {manager} {protocol} {hostname} {port}', 
            launch_cmd='srun --tasks-per-node=1 ' + FluxExecutor.DEFAULT_LAUNCH_CMD,
            provider=SlurmProvider(
                channel=LocalChannel(),
                nodes_per_block=2,
                init_blocks=1,
                partition='pdebug',
                walltime='00:10:00',
                launcher=SimpleLauncher(),
            ),
        )
    ],
)




parsl.load(config)

remote = False
shared_dir = "/usr/workspace/corbett8/Flux/"


@python_app
def py_mpi(parsl_resource_specification={}):
    from mpi4py import MPI

    return MPI.COMM_WORLD.Get_size(), MPI.COMM_WORLD.Get_rank()



print(py_mpi(parsl_resource_specification={"num_tasks": 6}).result())

parsl.clear()

Expose more Slurm Options

I am running into an issue where I need to specify mpi version in the srun command on one of our machines. Would it be possible to add something like an "srun_options" kwarg in the PySlurmExecutor interface (possibly others?) to allow this kind of flexibility?

How to handle definitions in the same file?

I'm still playing around integrating pympipool with pyiron_workflow in this PR. I had been struggling with a couple of tests that hang local, and on the CI gave errors like ModuleNotFoundError: no module named 'unit' or no module named 'test_macro'.

I managed to whittle things down to this minimal(ish) working example:

Suppose I run a test file, e.g. test_pympipool.py, with the following contents:

from unittest import TestCase

from pympipool import PyMPIExecutor


def add_two(x):
    return x + 2


class TestPyMPIExecutor(TestCase):

    def test_local(self):
        # Works perfectly
        def add_one(x):
            return x + 1

        class Foo:
            def add(self, x):
                return add_one(x)

        foo = Foo()
        executor = PyMPIExecutor()
        fs = executor.submit(foo.add, 1)
        self.assertEqual(2, fs.result())

    def test_semi_local(self):
        # Hangs
        # CI logs make it look like cloudpickle is trying to import from this module,
        # but can't find it
        class Foo:
            def add(self, x):
                return add_two(x)

        foo = Foo()
        executor = PyMPIExecutor()
        fs = executor.submit(foo.add, 1)
        self.assertEqual(3, fs.result())

When the function and class are defined in exactly the same scope, everything works perfectly. Not shown but also working perfectly is if I imported the add_x function from some other accessible location. The only thing that seems to be breaking is when I make a local definition that relies on something else defined in a different scope but the same file.

In a very hand-wavey way this makes sense to me -- the executor sees that add_two is not defined in this scope and so decides its something that can be pickled by reference; the only problem is that the file it's defined in is not in my import path! In contrast, when add_one is defined in exactly the same scope as the object being pickled, it's clear to the code that this object also needs to be pickled by value.

My questions are:

  • Should this count as a bug that can be fixed here in pympipool, or is what I'm asking for unreasonable? In the latter case, what is the recommended syntax for same-file-different-scope definitions like this, i.e. how to succinctly make sure everything is added to path, or can I flag particular stuff for by-value treatment?
  • On the CI it's super helpful to get the hard crash and the import error message -- why is it only hanging on my local machine, and can we force a crash under these conditions there too?

[feature] Reduce the number of threads

Currently, the executor shares a queue with the broker and the broker distributes the individual tasks to the individual workers. Still is would also be possible to have all workers directly access the queue of the user facing executor, the only thing that is unclear in that case is how to make sure the initialisation function is communicated to all workers.

[bug] futures negatively impacted by garbage collection

In the process of getting pympipool to play well with pyiron_workflow, I have been playing around a bunch with callbacks and using executors sans the with context. (See #187 and #210 for discussion.)

After repeated narrowing of my MWE, I ended up with this:

from pympipool.mpi.executor import PyMPIExecutor as Executor

mutable = []

def slow_callable():
    from time import sleep
    sleep(1)
    return True

def callback(future):
    mutable.append("Called back")
    
def submit():
    future = Executor().submit(slow_callable)
    future.add_done_callback(callback)
    return future
    
print(mutable)

# 1) Executor is locally scoped variable
# executor = Executor()
# future = executor.submit(slow_callable)

# 2) Executor is never assigned to a variable
# future = Executor().submit(slow_callable)

# 3) Executor is in a different scope
# future = submit()

future.add_done_callback(callback)

print(mutable)
future.result()
print(mutable)

The desired output is

[]
[]
['Called back']

Where the waiting happens when we invoke future.result(). However, this is only the case for option (1) where the executor is assigned its own variable in same scope as the future object. Options (2) and (3) both give

[]
['Called back']
['Called back']

Where the waiting happens at the submit call.

This led me in the direction of garbage collection, which led me to:

def __del__(self):
try:
self.shutdown(wait=True)
except (AttributeError, RuntimeError):
pass

(EDIT: Trying to get the preview blob to format nicely)

Changing the wait=True to wait=False resolves this issue and keeps the tests on my machine passing, but I don't know if it has other side effects.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.