The world’s leading publication for data science, AI, and ML professionals.

MPIRE for Python: MultiProcessing Is Really Easy!

An introduction to MPIRE, the lightning-fast and most user-friendly multiprocessing library for Python.

Photo by Jarek Jordan on Unsplash
Photo by Jarek Jordan on Unsplash

Python is a popular programming language for several reasons. Most importantly, it’s easy to set up and learn, and therefore the development speed is high. However, one of the major drawbacks is Python’s execution speed. Compared to many other popular programming languages, Python comes last when it comes to speed. Fortunately, the speed issue has been largely mitigated by writing many performance-critical libraries in C and adding Python wrappers (e.g., NumPy). These solutions work well and they can utilize multithreading for parallel computing. Things start to get tricky when your own code is slowing you down and you want to parallelize pure Python code.

Through multithreading, multiple threads of a single process are executed simultaneously. Libraries written in C/C++ can utilize multithreading without issue. Python can’t utilize multithreading because of the infamous Global Interpreter Lock (GIL). I won’t go down the rabbit hole of explaining what it does and why it’s still here as there are plenty of other excellent blog posts about this topic. The thing to remember here is that, because of this GIL, Python can’t utilize multithreading for CPU-intensive tasks like other languages. (Note: For I/O intensive tasks and other tasks that release the GIL, multithreading can work well.)

Therefore, Python programmers often need to rely on Multiprocessing, where new processes are spawned and executed simultaneously. By spawning new processes, we effectively side-step the GIL. However, to communicate between those processes we need to make use of pipes or queues. These communication primitives don’t only make multiprocessing slower, but they’re also tricky to use if you’re not that experienced.

There are plenty of Python libraries that offer multiprocessing capabilities and remove the need to write all the boilerplate code to work with processes and inter-process communication. For example, there are the multiprocessing.Pool __ and concurrent.futures.ProcessPoolExecutor classes, both of which are available in the Python Standard Library. Additionally, there are third-party packages such as Joblib, and distributed computing packages like Dask and Ray. The latter category additionally offers computation across several machines. In my 7 years as a Python programmer, however, multiprocessing on a single machine is often sufficient and the additional cost of setting up a cluster of workers isn’t worth the gain.

Yet, even with all these libraries around, none of them were working to my satisfaction. Most have a steep learning curve because they introduce a completely new programming syntax with respect to multiprocessing.Pool, provide poor error handling, or simply didn’t offer all the functionality I was looking for. So, four years ago, I created a new package that does all this and more. It has been used in dozens of projects already at Slimmer AI and after several iterations of feedback and exposure in production environments, it has become the mature package it is today. It is now the go-to multiprocessing library at Slimmer AI, and recently, we’ve made it publicly available on GitHub and PyPI. Documentation is available here.

In this blog post I will introduce our multiprocessing library MPIRE (MultiProcessing Is Really Easy) and compare it to existing libraries in terms of functionality, ease of use, and speed. Other than vanilla serial processing, the libraries I benchmark MPIRE against are multiprocessing.Pool, concurrent.futures.ProcessPoolExecutor __ (a wrapper around multiprocessing.Pool), Joblib, Dask, and Ray. In the remainder of this post I will use ProcessPoolExecutor to refer to concurrent.futures.ProcessPoolExecutor.

An overview of MPIRE

MPIRE builds on top of the popular multiprocessing __ standard library and mostly follows the same syntax, which makes it very easy to learn. The mpire.WorkerPool class is similar to the multiprocessing.Pool class but adds many more features and configuration options. The major features of MPIRE are:

  • Faster execution than other multiprocessing libraries
  • Intuitive, Pythonic syntax
  • Graceful and user-friendly exception handling
  • Easy use of copy-on-write shared objects with a pool of workers
  • Each worker can have its own state, and with convenient worker initialization and exit functionality, this state can be easily manipulated
  • Progress bar support using tqdm
  • Dashboard support
  • Worker insights to provide insight into your multiprocessing efficiency
  • Child processes can be pinned to specific, or a range of, CPUs
  • Multiple process start methods available, including: fork, forkserver, spawn, and threading (yes, threading)
  • Optionally utilizes dill as serialization backend through multiprocess, enabling parallelizing more exotic objects, lambdas, and functions in iPython and Jupyter notebooks

Going through all features is too much for this blog post. Therefore, I’ve focused on the features that are relevant for almost every multiprocessing job. Please refer to the documentation to learn more about the other features of MPIRE.

Let’s look at a few examples and compare MPIRE to other libraries.

Example 1. Syntax

Suppose we have a time-consuming function:

which we want to call 100 times and parallelize:

Joblib __ and Ray introduce a completely new syntax with respect to multiprocessing.Pool. This means it takes more time to learn how to both fully and optimally utilize these libraries. In contrast, the syntax of MPIRE is very close to multiprocessing.Pool.

Running this function without multiprocessing takes about 100 seconds to complete, while all tested multiprocessing libraries take about 20 seconds. This is because all were configured to create 5 processes, and thus to handle 5 tasks in parallel. There’s some minor overhead in starting the child processes and the inter-process communication, but a 5x speedup is expected in this case.

Example 2: Progress bar

Let’s make things a bit more interesting by adding a progress bar. Surely, when you’re running a long task, you want to know the status of the task and the time until completion. We’ll build on the previous example and utilize tqdm as the progress bar library.

In the case of multiprocessing.Pool we must use imap, a lazy version of map which returns a generator, if we want to show real-time progress information. Even though Ray __ features a dashboard, there’s no progress information to be found there. Both ProcessPoolExecutor and Ray introduce a lot of boilerplate code to get something as trivial as a progress bar working. Ray, however, takes the cake here. When using MPIRE, we can simply set the progress_bar flag and we’re done.

Example 3: Exception handling

As experienced as you may be, everyone introduces bugs occasionally. Debugging exceptions in a multiprocessing context isn’t always easy. However, multiprocessing libraries could at least make it easier to debug. Let’s look at the following simple example:

Now, this function screams ZeroDivisionError __ all over, and that’s intended here for illustration purposes. Let’s see how the different libraries will handle the following data:

We modify the code above a bit to use the new function and data, but it’s mostly similar. See this link for the full code. Let’s look at the output.

Error output of Ray (top), Joblib (middle), and MPIRE (below). Image by author.
Error output of Ray (top), Joblib (middle), and MPIRE (below). Image by author.

For most libraries we see exactly which line caused the error, namely return (x * y) / z. Joblib and Ray, however, only indicate that it happened on line 3 in some_function. Even though Ray __ raises a RayTaskError(ZeroDivisionError), it can still be caught as a regular ZeroDivisionError, so no worries there. The main difference between these tracebacks, however, is that MPIRE shows the arguments passed to the function that caused the error. This makes debugging a lot easier. The only other library to do this is Dask.

MPIRE packs many more features, of which a few are showcased in the benchmarks section up next.


Benchmarks

This post shows results on three benchmarks derived from a post on Ray __ from 2019, by Robert Nishihara. To make the benchmarks fairer, I added the startup and shutdown calls of the client to the total benchmark time and removed the code to ‘warm up’ the Ray workers. The latter was originally done by the author because the initial memory access is slower. Warming up isn’t something you would normally see in practice, and all multiprocessing libraries suffer from it anyway. Additionally, I increased the workload for each benchmark and made it consistent across different numbers of cores, to better see the benefits of adding more workers. Lastly, some of the multiprocessing implementations were optimized further by allowing the use of [Manager](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-managers)__ objects. Again, this makes for a fairer comparison.

The benchmarks were run on a Linux machine with 20 cores, with disabled hyperthreading and 200GB of RAM (which is more than plenty for these benchmarks). For each task, experiments were run with different numbers of processes/workers and results were averaged over 5 runs. The timings were consistent across libraries and benchmarks, so to make the graphs less cluttered the error bars are omitted. All benchmark code can be found here. See the [requirements.txt](https://github.com/sybrenjansen/multiprocessing_benchmarks/blob/main/requirements.txt) __ file to see which dependencies need to be installed.

Benchmark 1: Numerical Data

This benchmark processes an image using different image filters. The image remains the same for each filter. Therefore, libraries that can somehow send the image to each process once have a clear advantage. This isn’t possible with multiprocessing.Pool, as you would need to resort to multiprocessing.Process __ and handle all the communication and starting/joining processes yourself. Not ideal. For MPIRE, the code looks as follows:

It’s as easy as that. The image is passed as a copy-on-write shared object to each process, which means the data isn’t copied, but the underlying memory is reused. Only when processes alter the image data, a copy will be made. In our case, this doesn’t happen so we’re safe and processing will be fast.

The timing results are shown in the following figure:

Numerical computation benchmark results averaged over 5 runs. Image by author.
Numerical computation benchmark results averaged over 5 runs. Image by author.

multiprocessing.Pool and ProcessPoolExecutor are clearly underperforming, and we see no added benefit of using more than 4 workers for them. Sending the image around each time clearly causes a lot of overhead. On the other hand, the other libraries do see a consistent decrease in computation time as we increase the number of workers. Both Joblib and Ray are a bit slower than Dask and MPIRE when using 4 workers or less, but they catch up after that. In the end, it is Joblib and MPIRE that come out on top, albeit with a small margin.

In this benchmark Joblib utilizes its NumPy memory mapping feature to speed up resharing the same image object repeatably. Disabling this increases the total time for Joblib considerably. So it is important to keep in mind that when you do simple computations on an object that is not a NumPy array, Joblib will not be as fast.

Benchmark 2: Stateful Computation

In the second benchmark, each worker keeps track of its own state and should update it whenever new tasks come in. As in the original Ray __ post, the task is about processing text documents and keeping track of word prefix counts – up to a size of 3 characters. Whenever a certain prefix occurs more than 3 times, that prefix should be returned once all documents have been processed.

A simple class that does streaming prefix counting looks as follows:

Note: this implementation of StreamingPrefixCount __ is similar to the one from the original Ray post and doesn’t guarantee to return the correct prefixes over all documents. However, this doesn’t really matter in this case, as this benchmark is merely used to illustrate stateful computation.

Libraries that can store local data for each worker, and return data when all the work is done, are clearly the most suitable for this task. The libraries which support this are Dask, Ray, and MPIRE. Both Dask and Ray support Actors, which enable stateful computation. For Dask, however, the support seems experimental, and I was unable to get it to work without it crashing. To circumvent using Actors __ I utilized get_worker and sent a special sentinel token at the end to gather results. It isn’t pretty, but it does the job.

For Ray, the Actor _ functionality is working fine. One can use an `ActorPool`_ to nicely distribute the workload:

For MPIRE, we can utilize the [worker_state](https://sybrenjansen.github.io/mpire/usage/workerpool/worker_state.html) and [worker_exit](https://sybrenjansen.github.io/mpire/usage/map/worker_init_exit.html) functionality. In this case, we can also make use of copy-on-write:

The amount of boilerplate code is limited compared to the other libraries. As with Ray, there’s the added benefit that the load is automatically balanced over the workers.

multiprocessing.Pool, ProcessPoolExecutor, and Joblib have no support for worker state, so they need to rely on Python Manager objects. The downside of using Manager objects is that they live in separate server processes and any state change must be communicated to them using proxies. Therefore, these libraries take a big hit in performance, and perform worse when compared to serial processing.

Stateful computation benchmark results averaged over 5 runs. Image by author.
Stateful computation benchmark results averaged over 5 runs. Image by author.

The time for serial processing fluctuates a bit here, even though it can only utilize one worker. For this benchmark, each worker has its own state and keeps local prefix counts. To make the comparison fair serial processing also makes use of multiple StreamingPrefixCount __ objects, equal to the number of workers.

Looking at the results, Dask clearly struggles to keep up with Ray and MPIRE in terms of speed. If Dask was to fix their Actor __ implementation, it would perhaps be on par. Ray and MPIRE have similar performance. Although, by a very small margin, MPIRE is consistently slightly quicker.

Benchmark 3: Expensive Initialization

In this benchmark a neural network model is used to predict labels on an image dataset. (Note: For the sake of simplicity, the same dataset is used for prediction over and over again.) Loading this model only takes a few seconds. But if that is required for each task, that time quickly adds up. Although this benchmark seems similar to the previous example, this benchmark doesn’t require keeping track of changes in the worker state.

For the code snippets below, suppose we have a Model __ class which loads in the model and dataset when created. For multiprocessing.Pool, the code looks as follows:

For MPIRE, we once again utilize the [worker_state](https://sybrenjansen.github.io/mpire/usage/workerpool/worker_state.html). This time, we also use the [worker_init](https://sybrenjansen.github.io/mpire/usage/map/worker_init_exit.html) feature:

We could again make use of copy-on-write here, but the Tensorflow model doesn’t play nicely when doing that. Using the worker_init functionality, however, is just as fast. Note that we set keep_alive to True. By default, MPIRE shuts down workers after a single map call to reduce the memory needs. As starting and stopping workers is pretty fast with MPIRE, and Python can be quite memory hungry (it tends to keep allocated memory in stand-by), this usually is the desired behavior. However, in this case we don’t want to reload the model each run, so we keep the workers alive. Conveniently, they’re automatically shut down when exiting the context manager.

Expensive initialization benchmark results averaged over 5 runs. Image by author.
Expensive initialization benchmark results averaged over 5 runs. Image by author.

Joblib and Dask both perform pretty well at first, but are quickly overtaken by Ray and MPIRE. multiprocessing.Pool and ProcessPoolExecutor seem to catch up at the end, but you must realize that if the time to load the model increases, the difference between them – and libraries like Ray and MPIRE – will increase as well.

Overall Benchmark Performance

When normalizing the benchmark timings to a range of 0–1 and averaging them we see the following trend:

Average normalized benchmark results. Image by author.
Average normalized benchmark results. Image by author.

All multiprocessing libraries can beat single core performance, which is the whole point. Joblib has a clear edge over multiprocessing.Pool and ProcessPoolExecutor, and in turn Dask beats Joblib, because of its ability to store state. MPIRE and Ray perform even better than Dask, making them the preferred choice.

Although the speed between Ray and MPIRE is similar, the ease of use is what makes MPIRE the more interesting library to use when you are limited to a single computer.


Summary

This blog post introduced MPIRE, a multiprocessing library for Python which is easy to use, packs many features, and consistently beats all other multiprocessing libraries in terms of speed.

Not only does MPIRE have an intuitive, Pythonic syntax, it also has native progress bar support and user-friendly exception tracebacks. The latter will reduce your debugging time and keep your focus where it matters: executing your job as swiftly as possible.

If you find MPIRE a valuable multiprocessing tool for your own work, I’d love to hear from you. I hope it provides the efficiency gains for you as it has for us at Slimmer AI.


Related Articles