Parallel programming

Questions

  • When you need more than one processor, what do you do?

  • How can we use more than one processor/core in Python?

Objectives

  • Understand the major strategies of parallelizing code

  • Understand mechanics of the multiprocessing package

  • Know when to use more advanced packages or approaches

Modes of parallelism

You realize you do have more computation to do than you can on one processor? What do you do?

  1. Profile your code, identify the actual slow spots.

  2. Can you improve your code in those areas? Use an existing library?

  3. Are there are any low-effort optimizations that you can make?

  4. Consider using numba or cython to accelerate key functions.

  5. Think about parallelizing.

Many times in science, you want to parallelize your code: either if the computation takes too much time on one core or when the code needs to be parallel to even be allowed to run on a specific hardware (e.g. supercomputers).

Parallel computing is when many different tasks are carried out simultaneously. There are three main models:

  • Embarrassingly parallel: the code does not need to synchronize/communicate with other instances, and you can run multiple instances of the code separately, and combine the results later. If you can do this, great! (array jobs, task queues, workflow management tools)

  • Multithreading: Parallel threads need to communicate and do so via the same memory (variables, state, etc). (OpenMP, threading)

  • Multiprocessing, message passing: Different processes manage their own memory segments. They share data by communicating (passing messages) as needed. (multiprocessing, MPI).

Warning

Parallel programming is not magic, but many things can go wrong and you can get unexpected results or difficult to debug problems. Parallel programming is a fascinating world to get involved in, but make sure you invest enough time to do it well.

See the video by Raymond Hettinger (“See Also” at bottom of page) for an entertaining take on this.

Multithreading and the GIL

The designers of the Python language made the choice that only one thread in a process can run actual Python code by using the so-called global interpreter lock (GIL). This means that approaches that may work in other languages (C, C++, Fortran), may not work in Python. At first glance, this is bad for parallelism. But it’s not all bad!:

  • External libraries (NumPy, SciPy, Pandas, etc), written in C or other languages, can release the lock and run multi-threaded.

  • Most input/output releases the GIL, and input/output is slow. The threading library can be used to multithread I/O.

  • Python libraries like multiprocessing and mpi4py run multiple Python processes and this circumvents the GIL.

Consider the following code which does a symmetrical matrix inversion of a fairly large matrix:

import numpy as np
import time

A = np.random.random((4000,4000))
A = A * A.T
time_start = time.time()
np.linalg.inv(A)
time_end = time.time()
print("time spent for inverting A is", round(time_end - time_start,2), 's')

If we run this in a Jupyter notebook or through a Python script, it will automatically use multithreading through OpenMP. We can force NumPy to use only one thread by setting an environment variable (either export OMP_NUM_THREADS=1 or export MKL_NUM_THREADS=1, depending on how NumPy is compiled on your machine), and this will normally result in significantly longer runtime.

See also

multiprocessing

As opposed to threading, Python has a reasonable way of doing something similar that uses multiple processes: the multiprocessing module. The interface is a lot like threading, but in the background creates new processes to get around the global interpreter lock.

To show an example, the split-apply-combine or map-reduce paradigm is quite useful for many scientific workflows. Consider you have this:

def square(x):
    return x*x

You can apply the function to every element in a list using the map() function:

>>> list(map(square, [1, 2, 3, 4, 5, 6]))
[1, 4, 9, 16, 25, 36]

The multiprocessing.pool.Pool class provides an equivalent but parallelized (via multiprocessing) way of doing this. The pool class, by default, creates one new process per CPU and does parallel calculations on the list:

>>> from multiprocessing import Pool
>>> with Pool() as pool:
...     pool.map(square, [1, 2, 3, 4, 5, 6])
[1, 4, 9, 16, 25, 36]

Warning

Running the above example interactively in a Jupyter notebook or through an Python/IPython terminal may or may not work on your computer! This is a feature and not a bug, as covered in the documentation.

Fortunately, there is a fork of multiprocesssing called multiprocess which does work in interactive environments. All we have to do is install it by pip install multiprocess and change the import statement: from multiprocess import Pool.

Exercises, multiprocessing

Parallel-1, multiprocessing

Here, you find some code which calculates pi by a stochastic algorithm. You don’t really need to worry how the algorithm works, but it computes random points in a 1x1 square, and computes the number that fall into a circle. Copy it into a Jupyter notebook and use the %%timeit cell magic on the computation part (the one highlighted line after timeit below):

import random

def sample(n):
    """Make n trials of points in the square.  Return (n, number_in_circle)

    This is our basic function.  By design, it returns everything it\
    needs to compute the final answer: both n (even though it is an input
    argument) and n_inside_circle.  To compute our final answer, all we
    have to do is sum up the n:s and the n_inside_circle:s and do our
    computation"""
    n_inside_circle = 0
    for i in range(n):
        x = random.random()
        y = random.random()
        if x**2 + y**2 < 1.0:
            n_inside_circle += 1
    return n, n_inside_circle

%%timeit
n, n_inside_circle = sample(10**6)

pi = 4.0 * (n_inside_circle / n)
pi

Using the multiprocessing.pool.Pool code from the lesson, run the sample function 10 times, each with 10**5 samples only. Combine the results and time the calculation. What is the difference in time taken?

NOTE: If you’re working in an interactive environment and this doesn’t work with the multiprocessing module, install and use the multiprocess module instead!

(optional, advanced) Do the same but with multiprocessing.pool.ThreadPool instead. This works identically to Pool, but uses threads instead of different processes. Compare the time taken.

(advanced) Parallel-2 Running on a cluster

How does the pool know how many CPUs to take? What happens if you run on a computer cluster and request only part of the CPUs on a node?

MPI

The message passing interface (MPI) approach to parallelization is that:

  • Tasks (cores) have a rank and are numbered 0, 1, 2, 3, …

  • Each task (core) manages its own memory

  • Tasks communicate and share data by sending messages

  • Many higher-level functions exist to distribute information to other tasks and gather information from other tasks

  • All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing

Introductory MPI lessons where Python is included:

These blog posts are good for gentle MPI/mpi4py introduction:

Those who use MPI in C, C++, Fortran, will probably understand the steps in the following example. For learners new to MPI, we can explore this example together.

Here we reuse the example of approximating pi with a stochastic algorithm from above, and we have highlighted the lines which are important to get this MPI example to work:

import random
import time
from mpi4py import MPI


def sample(n):
    """Make n trials of points in the square.  Return (n, number_in_circle)

    This is our basic function.  By design, it returns everything it\
    needs to compute the final answer: both n (even though it is an input
    argument) and n_inside_circle.  To compute our final answer, all we
    have to do is sum up the n:s and the n_inside_circle:s and do our
    computation"""
    n_inside_circle = 0
    for i in range(n):
        x = random.random()
        y = random.random()
        if x ** 2 + y ** 2 < 1.0:
            n_inside_circle += 1
    return n, n_inside_circle


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

n = 10 ** 7

if size > 1:
    n_task = int(n / size)
else:
    n_task = n

t0 = time.perf_counter()
_, n_inside_circle = sample(n_task)
t = time.perf_counter() - t0

print(f"before gather: rank {rank}, n_inside_circle: {n_inside_circle}")
n_inside_circle = comm.gather(n_inside_circle, root=0)
print(f"after gather: rank {rank}, n_inside_circle: {n_inside_circle}")

if rank == 0:
    pi_estimate = 4.0 * sum(n_inside_circle) / n
    print(
        f"\nnumber of darts: {n}, estimate: {pi_estimate}, time spent: {t:.2} seconds"
    )

Exercises, MPI

Parallel-3, MPI

We can do this as exercise or as demo. Note that this example requires mpi4py and a MPI installation such as for instance OpenMPI.

  • Try to run this example on one core: $ python example.py.

  • Then compare the output with a run on multiple cores (in this case 2): $ mpiexec -n 2 python example.py.

  • Can you guess what the comm.gather function does by looking at the print-outs right before and after.

  • Why do we have the if-statement if rank == 0 at the end?

  • Why did we use _, n_inside_circle = sample(n_task) and not n, n_inside_circle = sample(n_task)?

Coupling to other languages

As mentioned further up in “Multithreading and the GIL”, Python has the global interpreter lock (GIL) which prevents us from using shared-memory parallelization strategies like OpenMP “directly”.

However, an interesting workaround for this can be to couple Python with other languages which do not have the GIL. This also works just as well when you don’t need parallelism, but need to make an optimized algorithm for a small part of the code.

Two strategies are common:

  • Couple Python with compiled languages like C, C++, Fortran, or Rust and let those handle the shared-memory parallelization:

    • C: use the cffi package (C foreign function interface). ctypes is a similar but slightly more primitive module that is in the standard library.

    • C++: use pybind11

    • Fortran: create a C interface using iso_c_binding and then couple the C layer to Python using cffi

    • Rust: use PyO3

  • Let compiled languages do the shared-memory parallelization part (as in above point) and let Python do the MPI work and distribute tasks across nodes using an mpi4py layer.

Coupling Python with other languages using the above tools is not difficult but it goes beyond the scope of this course.

Before you take this route, profile the application first to be sure where the bottleneck is.

Of course sometimes coupling languages is not about overcoming bottlenecks but about combining existing programs which have been written in different languages for whatever reason.

Dask and task queues

There are other strategies that go completely beyond the manual parallelization methods above. We won’t go into much detail.

Dask

Dask is a array model extension and task scheduler. By using the new array classes, you can automatically distribute operations across multiple CPUs.

Dask is very popular for data analysis and is used by a number of high-level Python libraries:

  • Dask arrays scale NumPy (see also xarray

  • Dask dataframes scale Pandas workflows

  • Dask-ML scales Scikit-Learn

Dask divides arrays into many small pieces (chunks), as small as necessary to fit it into memory. Operations are delayed (lazy computing) e.g. tasks are queue and no computation is performed until you actually ask values to be computed (for instance print mean values). Then data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

Example from dask.org

# Arrays implement the Numpy API
import dask.array as da
x = da.random.random(size=(10000, 10000),
                     chunks=(1000, 1000))
x + x.T - x.mean(axis=0)
# It runs using multiple threads on your machine.
# It could also be distributed to multiple machines

Exercises, Dask

Dask-Examples (optional)

Dask examples illustrate the usage of dask and can be run interactively through mybinder. Start an interactive session on mybinder and test/run a few dask examples.

Task queues

A task queue has a scheduler which takes a list of small jobs and distributes them to runners for computation. It serves as a synchronization layer and may be useful for embarrassingly parallel jobs.

There are different descriptions of task queues in Python. Job runners ask the queue for the task which needs to be done next. If you can divide your job into many small parts, this may be useful to you. However, if you have a cluster with a job scheduler, this may be a bit redundant.

See also

Keypoints

  • Pure Python is not very good for highly parallel code.

  • Luckily it interfaces to many things which are good, and give you the full control you need.

  • Combining vectorized functions (NumPy, Scipy, pandas, etc.) with the parallel strategies listed here will get you very far.

  • Another popular framework similar to multiprocessing is joblib.