Distributing in Python

Ok, so you've heard about all the hype around distributed computing and want to get in on the goodness! Or maybe you're a distributed computing veteren, but still a python neophyte? Either way, this blog post may be just tht epython distributed computing primer that you need to get started!

I've recently needed to start diving into the wacky world of distributed computing in python. Typically I only use distributed computing for feature processing in production models. When you have tons of data streaming into your model and need to clean and process it quickly...well scala spark can do wonders! Of course there are plenty of other nice tools like Flink as well, but my collegues like scala spark so scala spark it is.

However, it's only recently that I've been getting involved with distributed computing in python. Mostly this has to do with research I'm doing around scaling stats models to rediculously large datasets and very complex processes. Unlike most of my blogs where I bury the lead a bit and try to paint a story (I love stories), I'm starting this blog with the punch line:

  1. Python was not made to be distributed, so distributing in python is not as easy as in other languages
  2. Not all distributing is made equally.
    1. Handling IO for a smooth user inerface (i.e. in a web app): Use multi-threading
    2. Parallel processes on a single machine: Multi-processing with shared memory
    3. Parallel processes on multiple machines: Multi-processing without shared memory

All of these use cases can be handled by Dask, but Dask is a relatively new package and I've hit a few pot holes trying to use it. The first and second use cases can be handled by the Multiprocessing package.

You can theoretically also use pyspark to do distributed computing in python, but I don't recommend it. Pyspark has given me and collegues many issues in the past. Dask is probably a better choice.

Ok, let's start learning. Here is the blog overview:

  • What is Multi-Threading?
  • What is Multi-Processing?
  • Dask
  • Closing Remarks

What is Multi-Threading?

In computing, a thread is a 'thread of execution'. Each thread represents a task. A process may contain multiple threads whcih all share the same resoruces (different processes do not share resources). Threads are light weight (they are the smallest unit of processing that can be performed on an OS) and have a very specific use.

Since threads share resoruces, they are bad for dsitributing work acorss the CPU. If you have 24 virtual cores in your laptop, threading will only use the cores available to the process they are part of. Well that sucks! Why would we ever use threads? Well, imagine we were building a website and a user clicks on a button that uploads a picture and takes the user to a different aopge that displays that picture. It might take a while to upload the picture. If we try to go to the enw apge before the picture uploads all hell will break loose. With multithreading we can spin up a new thred to handle the upload and WAIT for this thread to complete before the original thread (handling cahnges between web pages) moves the user to the new page.

It is for these use cases, where networking, data, and I/O (Input/Output) are important, that we might want to use multi-threading.

Before we go too deep, let's pull out the code. Let's say we have a process that needs to download from four different data sources at once. None of the downloads depends on the others so we can run the independantly. each download takes 1 second. How long does this take to do on a single thread?

In [1]:
%%time

def fake_download(num):
    time.sleep(1)
    print("Download: {}".format(i))

for i in range(5):
    fake_download(i)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<timed exec> in <module>

<timed exec> in fake_download(num)

NameError: name 'time' is not defined

Ugh, 5 seconds? I ain't got time for that! Let's use multi-threading to speed this up.

In [2]:
import threading
from queue import Queue
import time
In [3]:
%%time

def fake_download(num):
    time.sleep(1)
    print("Download: {}".format(num))
    
for i in range(5):
    t = threading.Thread(target=fake_download, args=(i,))
    t.start()
    
t.join() # try commenting this out and then running
Download: 0
Download: 1
Download: 2
Download: 3
Download: 4
CPU times: user 3.44 ms, sys: 3.2 ms, total: 6.64 ms
Wall time: 1.01 s

Wow, it ran in one second(ish)! That's because we kick off each 'download' together rather than waiting for each one to finish.

Of course, I'm trying to get the run time of the entire process. The only way I can get an accurate time, as it turns out, is to wait for the last process to finish. To do this I add t.join() at the end. This makes the main thread wait until the last thread completes to run.

Try running the cell without the t.join(). You should see a time in milliseconds. That can't be right! The reason the time is so low is because the time fucntion in the main thread is closing out before download threads finish running.

Sometimes it can be tedious managing threads using threading. We can use Threadpool from multiprocessing to abstract away some of the effort for us.

In [4]:
from multiprocessing.pool import ThreadPool
In [5]:
%%time 

def fake_download(num):
    time.sleep(1)
    print("Download: {}".format(num))

def Parallel(numbers, threads=1):
    pool = ThreadPool(threads)
    results = pool.map(fake_download, numbers)
    pool.close()
    pool.join()
    return results

numbers = [i for i in range(5)]
squaredNumbers = Parallel(numbers, 5)
Download: 0Download: 3Download: 1

Download: 2

Download: 4
CPU times: user 7.78 ms, sys: 4.2 ms, total: 12 ms
Wall time: 1.03 s

Ok, now let's talk about locks. We often want our functions to modify variables. Waht happens if multiple threads want to update the same variable? Locks are ways to ensure that only one thread can access a variable at a time. We "lock" the variable down until it is not being used.

A properly used lock will ensure that when a fucntion pulls a variable, that variable will stay the same until the fucntion updates it. If the lock wasn't there a different function might pull or update the variable while the first function was still running.

We won't dive any deeper into the details around multi-threading, but if you want to learn more, you can go here.

It's important to note that multi-threading should be sued with caution. Multi-threading incurs overhead abd should only be used when needed. Try to avoid multi-threading for basic tasks. Multiple threads also increases the complexity of programs and makes them hard to debug.

What is Multi_processing?

Unlike many other languages, python has a Gloval Interpretor Lock (GIL). This means that python is not "thread-safe". Python objects have a globally enforced lock that prevents you from getting yourself into trouble. These saftey rails are nice for preventing you from shooting yourself in the foot in many cases, but limit our python process to using only one core of the CPU.

Sometimes we want to use more than one core. Now certain versions of python do not have the GIL, like iron python an Jython. However, we usually aren't working in Jython so let's assume we need to work around the GIL to make use of multiple cores in our CPU. To use multiple cores we need to spin up multiple processes! Each process will have it's own GIL and memory.

I will warn you, while we are taking advantage of modern dat cpu architectures and leveraging multiple cores, there is a lot of overhead with multi-processing and it should only be used when needed. Many python pacakges (numpy for example) are very well optimized and can run quickly even without parallelisation.

For our first example of multiprocessing we'll assume that each process is running on the same machine. Given this assumption, we may choose to share varaibles/memory. This places limits on how much we can scale, but it's quick and dirty and works for many use cases.

For fun I print out the job number and the id of the process that completed the job. The shared variable is return dict. The key in return_dict that prevents workers overwriting each other is the job number. Each job has a unique id in this case (what I'm calling the job number).

In [6]:
from os import getpid
import multiprocessing
In [7]:
%%time
def worker(procnum, return_dict):
    '''worker function'''
    time.sleep(1)
    print( 'I am number %d in process %d' % (procnum, getpid()) )
    return_dict[procnum] = getpid()
    
manager = multiprocessing.Manager()
return_dict = manager.dict()

jobs = []
for i in range(5):
    p = multiprocessing.Process(target=worker, args=(i,return_dict))
    jobs.append(p)
    p.start()

for proc in jobs:
    proc.join()
print( return_dict.values() )
I am number 0 in process 58186
I am number 1 in process 58187
I am number 2 in process 58188
I am number 3 in process 58189
I am number 4 in process 58190
[58186, 58187, 58188, 58189, 58190]
CPU times: user 16.7 ms, sys: 18.7 ms, total: 35.4 ms
Wall time: 1.04 s

In this next example I do not share memory between processes. I also use a processing pool to define how many processes I want to spin up. In the example above I jsut spin up a process for each job, but I might have more jobs than I ahve cores on my CPU. In such as case I would set the number of processes to the number of idle cores. Processes would pick up jobs as they are available.

In [8]:
%%time
def worker(procnum):
    time.sleep(1)
    print( 'I am number %d in process %d' % (procnum, getpid()) )
    return getpid()

pool = multiprocessing.Pool(processes = 5)
output = pool.map(worker, range(5))
pool.close()
pool.join()
print( output )
I am number 2 in process 58193
I am number 1 in process 58192
I am number 3 in process 58194
I am number 0 in process 58191
I am number 4 in process 58195
[58191, 58192, 58193, 58194, 58195]
CPU times: user 14.9 ms, sys: 18.1 ms, total: 33 ms
Wall time: 1.05 s

Dask

Now you may want to distribute to more than one computer/server. For this use case (in python) I recommend Dask. You can also use pyspark if you so choose, but I find Dask to be easier to use in python.

Dask doesn't always play nice with jupyter. There can be some nasty curve balls depending on what port you run jupyter on and what environments you are using. I'll jsut leave the Dask code below. If it doesn't work with jupyter, I recommend running it in the command line or in a py file.

Run in terminal to avoid 'too many files' error.

import numpy as np
from dask.distributed import Client
from dask.distributed import as_completed
import functools
import time

def f(x):
    time.sleep(20)
    return i, np.random.normal(0,1,1)[0]

client = Client()  # set up local cluster on your laptop
wrapper = functools.partial(client.submit, f)

results = []
for i in range(5):
    results.append(wrapper(i))
results = as_completed(results)
for i in results:
    print(i.result())

Closing Remarks

Hopefully you have some understanding of distributed computing in python now. It can be a powerful tool when used in the right way for the right kinds of problems. If you are looking for a good use case for trying out distributed computing, why not try to implement Hog Wild! training in python?

Hog Wild is a method where each worker performs gradient descent on the weights of a model like lienar regression or a neural network. The weight values have no lock on them and so the workers can actually override each other's updates! THis is a very easy way to distribute training of a model and has something of a regularizing affect. Here is a nice blog post walking you through it: Implementing Hogwild!

P.S

After incorporating multiprocessing into a recent open source project I've been contributing to, I realised that there are some useful extensions to the multiprocessing and dask examples above.

For our use case, we kill be creating a simple kernel in python and trying to distribute it's computation. What is a kernel? For our purposes we on'y need to think of a kernel as computing the element wise distances between two arrays. To understand kernels and why they are useful, you should take a look at the kernel trick, support vector machines, kernel density estiamtes, and gaussian processes.

Let's code a simple brute force example of a kernel.

In [9]:
import numpy as np

def absolute_distance(a, b):
    return abs(a-b)

def kernel(distance_function, alpha, beta):
    n, m = alpha.shape[0], beta.shape[0]
    covariance = np.full((n, m),0.0)
    for i in range(n):
        for j in range(m):
            covariance[i,j] = distance_function(alpha[i], beta[j])
    return covariance.astype(np.float32).round(3)

alpha = np.random.rand(5)
beta = np.random.rand(5)

kernel(absolute_distance, alpha, beta)
Out[9]:
array([[0.161, 0.03 , 0.078, 0.253, 0.343],
       [0.95 , 0.82 , 0.867, 0.537, 0.447],
       [0.858, 0.728, 0.775, 0.445, 0.355],
       [0.511, 0.38 , 0.428, 0.098, 0.008],
       [0.385, 0.255, 0.302, 0.028, 0.118]], dtype=float32)

Notice that in this brute force method we ahve two ensted for loops. Ugh, for large arrays that could take forever to evaluate! Now there are a number of tricks to make this code run faster, but this post is about multiprocessing. Let's try and come up with a way to solve this problem with multiprocessing.

Let's have each worker evaluate a row of the coviariance matrix! First we'll create a pool of workers to draw from. We want to max number of workers possible, so we set the pool size to the number fo virtual cores on the cpu of my macbook pro.

We then define a worker function. This is the function that each worker will evaluate. It will return a single row i of the coviariance matrix.

We need to append these rows to the covariance matrix in the right spot, so the log_result fucntion passed through to the worker via the callback argument in apply_async will take the worker resutls and palce them into the correct row in the covariance matrix.

The meat of the kernel is the apply_async function which adds a task the worker pool. However, since we are running asynchronously, it could be that the kernel function finishes before all of the workers return their results! TO make sure that the kernel fucntion waits until each worker returns their results, we wait for every result flag to come back (whether successful or not) before returning the covariance matrix.

In [10]:
# get the number of virtual cores supported by local machine
pool_size = multiprocessing.cpu_count()
# set pool size to max workers available
pool = multiprocessing.Pool(processes=pool_size)

# worker for evaluating row i for coviariance amtrix
def worker(i, alpha_element, beta, m, distance_function):
    output = np.full(m, 0.0)
    for j in range(m):
        output[j] = distance_function(alpha_element, beta[j])
    return i, output.reshape(-1)

def kernel(distance_function, alpha, beta):
    n, m = alpha.shape[0], beta.shape[0]
    covariance = np.full((n, m),0.0)
    
    # writes output of workers to covariance matrix
    def log_result(result):
            i, row = result
            covariance[i, :] = row
            
    # keep track of result flags of processes
    result_flags = np.full((n), None)
    for i in range(n):
        # arguments for the worker
        args = (i, alpha[i], beta, m, distance_function)
        # put task in queue for worker pool
        result_flag = pool.apply_async(worker, args=args, callback=log_result)
        result_flags[i] = result_flag

    # wait for processes to finish
    for result_flag in result_flags:
        result_flag.wait()
    
    return covariance.astype(np.float32).round(3)

kernel(absolute_distance, alpha, beta)
Out[10]:
array([[0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]], dtype=float32)

Now we can try the same using Dask! The advantage of Dask is that we can distribute over multiple computers/servers much more easily if we wanted to by modifying the Dask Client.

We first create a dask client (like the multiprocessing process pool). I've learned that for a number fo edge cases in certain evironments in python that Client will through errors if created with processes=True. I won't dive into the how's and why's, but it's worth noting in case your dask client starts acting up for seemingly no reason. Luckily we don't need to worry about setting a pool size!

We create a dask worker simialrly to how we create a worker fucntion for multiprocessing. Much of the code is the same as before. I'll note that I am creating a scattered version of the beta array. I do this because each needs access to that same object and scattering the object among the workers saves on pickling costs for sending information between/to each worker. Remember, everything sent to the workers must be pickled and serialized! I then create a lsit of future objects which will be the results of the worker processes.

I then collect the future object to get the results of the workers. Finally I assign the reulsts to their respective rows in the covariance matrix.

In [11]:
from distributed import Client

client = Client(processes=False)

def worker_dask(alpha_element, beta, m, distance_function):
    output = np.full(m, 0.0)
    for j in range(m):
        output[j] = distance_function(alpha_element, beta[j])
    return output.reshape(-1)

def kernel(distance_function, alpha, beta):
    # lengths of each vector to compare
    n, m = alpha.shape[0], beta.shape[0]
    # create an empty array to fill with element wise vector distances
    cov = np.full((n, m), None)
    # scatter the beta array to lessen pickling costs
    scattered_beta = client.scatter(beta)
    # loop through each vector and put future object into list
    futures = [client.submit(worker_dask, alpha[i], scattered_beta, m, distance_function) for i in range(n)]
    # get futures from futures list
    results = client.gather(futures)
    # assign futures to covairance matrix
    for i, row in enumerate(results):
        cov[i, :] = row

    return cov.astype(np.float32).round(3)

kernel(absolute_distance, alpha, beta)
Out[11]:
array([[0.161, 0.03 , 0.078, 0.253, 0.343],
       [0.95 , 0.82 , 0.867, 0.537, 0.447],
       [0.858, 0.728, 0.775, 0.445, 0.355],
       [0.511, 0.38 , 0.428, 0.098, 0.008],
       [0.385, 0.255, 0.302, 0.028, 0.118]], dtype=float32)

I hope you enjoyed this little addendum to my original post. As I work with multiprocessing sue cases more and more I'm ever learning better and different ways to do things and hope to keep sharing my insights on my blog!

In [ ]: