Don’t be afraid of parallel programming in Python

When it comes to writing code, I have always been a believer of the rules of optimization, which state:

The first rule of optimization is: Don’t do it.

The second rule of optimization (for experts only) is: Don’t do it yet.

These rules exist because, in general, if you try to rewrite your code to get a speed up, you will probably waste a lot of time and end up with code that is unreadable, fragile and that only runs a few milliseconds faster. This is especially true in scientific computing, where we are writing in high level languages, which use highly optimized libraries to perform computationally intensive tasks.

However, there are times when those rules of optimization can be broken. And there is a super simple way of leveraging parallel programming in Python that can give you a >10x speed up.

We all know that when we are writing code in numpy, we want to used vectorized code. So if we wanted to add 2 to every element in a vector we would never write:

import numpy as np

vector = np.array([1,2,3])
new_vector = np.zeros_like(vector)
for n in range(vector.size):
   new_vector[n] = vector[n] + 2

We would instead write:

new_vector = vector + 2

In fact we can see that the vectorized approach is not only much simpler, but much faster. Using a for loop takes 300 ms:

import time

array_size = 1000000
vector = np.arange(array_size)

time0 = time.time()
new_vector = np.zeros_like(vector)

for n in range(vector.size):
    new_vector[n] = vector[n]+2

time1 = time.time()

print(time1-time0) #About 300 ms

While the vectorized edition takes 1 ms.

vector = np.arange(array_size)
time0 = time.time()
new_vector = vector+2
time1 = time.time()
print(time1-time0) #About 1 ms

But you don’t have to be a veteran coder to experience situations where you CAN’T vectorize the code. In my example we’re going to imagine we have a 3D matrix that represents video data, of shape (n_frames, n_rows, n_cols).

Imagine that each frame needs to be subjected to a function in an image processing library, and this is very computationally expensive. A lot of people would write code very much like the non-vectorized code above. We would know that this code is bad, but there was nothing we could do., e.g.

img_stack = np.random.random((100,512,512)) #My fake image stack

def expensive_function(input):
    time.sleep(0.1) #fake computationally expensive task
    return input

time0 = time.time()
output = np.zeros_like(img_stack)
for f in range(img_stack.shape[0]):
    output[f,:,:] = expensive_function(img_stack[f,:,:])

time1 = time.time()
print(time1-time0) #Just over 10 seconds.

The trick

So we want to do this task in parallel. The processing of each frame is completely independent of the others, so if we could only get each core in the CPU to do a selection of the frames, we could speed this process up a lot.

But a lot of people are scared of parallel processing. They think it is always a nightmare of workers, queues and stacks and a mess of synchronization, thread locks and deadlocks. But let me show you how do it easily. First we import the multiprocessing library, and figure out how many cores are available:

import multiprocessing
num_processes = multiprocessing.cpu_count()

Then we split our data up so that each chunk we want to process independently is it’s own element in an array:

chunks = [img_stack[f, :, :] for f in range(img_stack.shape[0])]

And then we pass each chunk to our function in parallel with this magical code:

with multiprocessing.Pool(processes=num_processes) as pool:
    results = pool.map(expensive_function, chunks)

Now, that code will work without any changes in an IPython notebook. If you are running the code directly, you need to hide the Pool() call within another function [this is due to the vagaries of how the multiprocessing library works, discussed below). But a minimal version looks like this:

img_stack = np.random.random((100,512,512)) #My fake image stack
chunks = [img_stack[f, :, :] for f in range(img_stack.shape[0])]

def expensive_function(input):
    time.sleep(0.1) #fake computationally expensive task
    return input

def run():
    num_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(expensive_function, chunks)
    
    return results


if __name__ == "__main__":
    results = np.array(run())

So that is wonderfully simple. But you might have one worry. Synchronizing across threads in notoriously difficult. Am I sure that the processed frames are returned in the same order they were put in? Yes I am sure. Check this out. First we make an numpy array with values that make it easy to track:

n_frames = 100
arr = np.arange(n_frames).reshape((n_frames,1,1))
print(arr)
array([[[ 0]],

       [[ 1]],
...
       [[98]],

       [[99]]])

Then we pass that array in like before

chunks = [arr[f, :, :] for f in range(arr.shape[0])]

def expensive_function(input):
    time.sleep(0.1) #fake computationally expensive task
    return input

def run():
    num_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(expensive_function, chunks)
    
    return results


if __name__ == "__main__":

    results = np.array(run()) 

And if you look at the end, your results will all be perfectly in order.

I will finally give you a little warning, and it relates to why the function needs to be hidden behind a if __name__ == “__main__”. The multiprocessing library generally works letting each CPU load your whole script before it runs. So if any functions are called ‘out in the open’, the will be called when the script is loaded. And if those functions make a call to multiprocessing.Pool() then they will all spawn new processes, and each of those processes will spawn new processes, and so on. So you do need to be a little careful. But so long as you make sure all your code is compartmentalized in functions, and your entry into the Pool() function is hidden behind a if __name__ condition, this is an easy way to get a big speed boost. Similarity, don’t just dump your whole code into this pool.map() function. You don’t want to accidentally parallelize loading your data into RAM. But if your data is sitting nicely in memory, and you’re iterating through it, consider the multiprocessing.Pool() approach.

Leave a Reply

Your email address will not be published. Required fields are marked *