Skip to content

Chapter 6 - Tools for parallellization

These tools can be used for parallelizing a problem on a HPC cluster (e.g. Vaughan), but also on your local machine if you install MPI, mpi4py and dask_mpi. To execute a python script script.py in parallel on Vaughan, you must execute a job script with the command

srun python script.py <script parameters>

This will automatically run the script with the requested resources. For more information about submitting job scripts see Submitting jobs on Vaughan.

To run the python script in parallel on 4 CPUs on your local machine you must run this commmand in a terminal:

mpirun -np 4 python script.py

Mpi4py

Mpi4py is a Python package that wraps the popular MPI framework for message passing between processes. It is very flexible, but typically requires quite a bit of work to set up all the communication correctly.

Here are some useful links:

Note

On your local machine you must first install a MPI library for your OS, before you can pip install mpi4py.

Dask-MPI

The Dask-MPI project makes it easy to deploy Dask from within an existing MPI environment, such as one created with the common MPI command-line launchers mpirun or mpiexec. Such environments are commonly found in high performance supercomputers, academic research institutions, and other clusters where MPI has already been installed. The dask documentation is here.

Dask futures provide a simple approach to distribute the computation of a function for a list of arguments over a number of workers and gather the results. The approach reserves one cpu for the scheduler (rank 0) and one for the client script (rank 1). All remaining ranks are dask workers. Here is a typical example that computes the square of each item in an iterable using an arbitrary number of dask workers:

from mpi4py import MPI
from dask_mpi import initialize
initialize()
# Create a Client object
from distributed import Client
client = Client()

def square(i):
    result = i*i
    # a print statement, just to see which worker is computing which result:
    print(f'rank {MPI.COMM_WORLD.Get_rank()}: {i=}**2 = {result}')
    # Note that the order of the output of the print statements does NOT 
    # necessarily correspond to the execution order.
    return result

if __name__ == "__main__":
    # Apply the square method to 0, 1, ..., 999 using the available workers
    futures = client.map(square, range(1000))
    # client.map() returns immediately after the work is distributed.
    # Typically, this is well before the actual work itself is finished.
    # Gather the results
    results = client.gather(futures)
    # client.gather() can only return after all the work is done, obviously.
    print('-*# finished #*-')

Note

At the time of writing, dask is not installed as an LMOD module on the cluster. So you must install it yourself. Make sure you first source the wetppr-env.sh script mentioned in LMOD modules.

To install dask_mpi, run:

> . path/to/wetppr-env.sh     # to set $PYTHONUSERBASE
> python -m pip install --user dask_mpi --upgrade
> python -m pip install --user dask distributed --upgrade

Note

This installs dask_mpi, dask and dask.distributed into the directory specified by $PYTHONUSERBASE. This environment variable must also be available to the job. If $PYTHONUSERBASE is not set in your ~/.bashrc file, you must set it in the job script.

Note

Dask-MPI builds on mpi4py, so, MPI and mpi4py need to be available in your environment.