Pool Limited Queue Processing in Python

How to process your data in parallel with a predefined number of threads

Konstantin Taletskiy
Towards Data Science

--

I was recently confronted with a problem: I needed to build a large number (order of 100) of Docker containers and then push them to the registry. Docker SDK for Python provided an excellent handle on that, and together with `multiprocessing` library allowed to parallelize the task very effectively. However, after some initial testing I discovered that pushing multiple images to registry got stalled likely due to an overload of simultaneous uploads. In my testing, I was only able to run 2–3 simultaneous docker push commands until all the new ones I add got stalled. At that point I decided to limit the simultaneous uploads to the small number of parallel threads, while still utilizing large number of threads to facilitate image builds. Combination of queue (multiprocessing.Queue) for passing down the work from builder threads to pusher threads and thread pool (multiprocessing.Pool) looked like a best candidate. Yet, there are small nuances and gaps in documentation which took me some time to understand (especially when using multiprocessing on Windows). Below, I provide a small tutorial on how to use these data structures and objects.

Problem formulation

Algorithm design

In this toy problem we have a large array of parallel Processes writing results into the Queue. Alongside them, there is a single-threaded reader Process checking for new items in the Queue and assigning them to new Processes in the Pool, such that only a small fixed number of these Processes are running at the same time. Let’s go through all the elements below.

Process

Each Process runs in a separate thread
Photo by frank mckenna on Unsplash

For our large array of parallel threads on the left we are going to use multithreading.Process(). From the official reference:

Processobjects represent activity that is run in a separate process.

Starting a process(es) requires 2 things: the target function called and the Processcall itself. Let’s take a look:

from multiprocessing import Processdef proc(i):
print(f'I am Process {i}')
if __name__ == '__main__':
for i in range(10):
Process(target=proc, args=(i,)).start()

In the example above we created 10 Processes and launched them all at the same time. Each process is running an instance of proc() function with arguments taken from arg. Because the order of execution is not guaranteed, when we run it, we get something like:

I am Process 6
I am Process 2
I am Process 0
I am Process 3
I am Process 7
I am Process 4
I am Process 8
I am Process 1
I am Process 5
I am Process 9

Notice also the interesting syntax of the args=(i,). Process requires that args is iterable, so changing it to args=(i) or args=i will lead to a TypeError.

Queue

Photo by Hal Gatewood on Unsplash

Now, it is time to introduce a multithreading.Queue(). According to reference:

Queue() returns a process shared queue implemented using a pipe and a few locks/semaphores.

Queue allow us to put objects to it and process them elsewhere asynchronously. Importantly, queues are thread and process safe. Let’s modify our previous example to add the Queue object and pass it to our parallel Processes:

from multiprocessing import Process, Queuedef writer(i,q):
message = f'I am Process {i}'
q.put(message)
if __name__ == '__main__':
# Create multiprocessing queue
q = Queue()

# Create a group of parallel writers and start them
for i in range(10):
Process(target=writer, args=(i,q,)).start()
# Read the queue sequentially
for i in range(10):
message = q.get()
print(message)

Keep in mind that Queue.get() is a blocking method, so we are not going to miss any messages in that queue.

The next step in solving our problem is to switch to a parallel reads from the queue. We could just spawn the reader processes in the same way we spawned writers, but that will permit up 10 threads run in parallel. What should we do if we are limited by the smaller number of readers like in the original problem description?

Pool

Photo by James Lee on Unsplash

Enter multithreading.Pool():

“A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation”.

Using Pool we can assign as many parallel processes as we like, but only the `processes` number of threads will be active at any given moment.

Let’s see how it will behave if we throw all the readers to the `Pool`:

from multiprocessing import Process, Queue, Pooldef writer(i,q):
message = f’I am Process {i}’
q.put(message)
def reader(i,q):
message = q.get()
print(message)
if __name__ == ‘__main__’:
# Create multiprocessing queue
q = Queue()
# Create a group of parallel writers and start them
for i in range(10):
Process(target=writer, args=(i,q,)).start()
# Create multiprocessing pool
p = Pool(10)
# Create a group of parallel readers and start them
# Number of readers is matching the number of writers
# However, the number of simultaneously running
# readers is constrained to the pool size

for i in range(10):
p.apply_async(reader, (i,q,))

However, if we run the code above, we will get no output. What happened? When we called apply_async, the code execution immediately moved on and, since nothing else has left in the main function, exited. Thankfully, multiprocessing reference provides a way to wait for the execution results:

from multiprocessing import Process, Queue, Pooldef writer(i,q):
message = f’I am Process {i}’
q.put(message)
def reader(i,q):
message = q.get()
print(message)
if __name__ == ‘__main__’:
# Create multiprocessing queue
q = Queue()
# Create a group of parallel writers and start them
for i in range(10):
Process(target=writer, args=(i,q,)).start()
# Create multiprocessing pool
p = Pool(10)
# Create a group of parallel readers and start them
# Number of readers is matching the number of writers
# However, the number of simultaneously running
# readers is constrained to the pool size
readers = []
for i in range(10):
readers.append(p.apply_async(reader, (i,q,)))
# Wait for the asynchrounous reader threads to finish
[r.get() for r in readers]

This time, if we run the code we will get the following error: RuntimeError: Queue objects should only be shared between processes through inheritance. The multiprocessing.Manager will enable us to manage the queue and to also make it accessible to different workers:

from multiprocessing import Process, Queue, Pool, Managerdef writer(i,q):
message = f’I am Process {i}’
q.put(message)
def reader(i,q):
message = q.get()
print(message)
if __name__ == ‘__main__’:
# Create manager
m = Manager()
# Create multiprocessing queue
q = m.Queue()
# Create a group of parallel writers and start them
for i in range(10):
Process(target=writer, args=(i,q,)).start()
# Create multiprocessing pool
p = Pool(10)
# Create a group of parallel readers and start them
# Number of readers is matching the number of writers
# However, the number of simultaneously running
# readers is constrained to the pool size
readers = []
for i in range(10):
readers.append(p.apply_async(reader, (i,q,)))
# Wait for the asynchrounous reader threads to finish
[r.get() for r in readers]

Finally, we are able to get the results we expect:

> python pl.pyI am Process 1
I am Process 4
I am Process 9
I am Process 8
I am Process 0
I am Process 5
I am Process 7
I am Process 2
I am Process 6
I am Process 3

Windows-related quirks

I initially started working on this problem on a Linux-based machine, but later continued on Windows. Unfortunately, many of the things did not work immediately. Here are the things you need to know:

1. Interrupting the program execution (Ctrl+C) will not work right away with the code above. The workaround would be to add initializer workers:

def init_worker():
"""
Pool worker initializer for keyboard interrupt on Windows
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)p = Pool(num_readers, init_worker)

2. I was not able to run the code in Jupyter notebook on Windows, unless I move worker functions into separate .py file and import them to my notebook. Related to that, you won’t be able to run the scripts above without wrapping the main code into if __name__ == ‘main':.

Final Result

As finishing touches, let’s add the following:
• delays to imitate CPU-bound work on reader and writer
• exception handling when waiting for reader threads to finish
• configurable number of writer and reader threads
• some function documentation

Here is the final result:

from multiprocessing import Pool, Queue, Process, Manager
import random
import signal
import time
num_writers = 10
num_readers = 3
def writer(i,q):
# Imitate CPU-bound work happening in writer
delay = random.randint(1,10)
time.sleep(delay)

# Put the result into the queue
t = time.time()
print(f’I am writer {i}: {t}’)
q.put(t)
def init_worker():
"""
Pool worker initializer for keyboard interrupt on Windows
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def reader(i, q):
"""
Queue reader worker
"""

# Read the top message from the queue
message = q.get()

# Imitate CPU-bound work happening in reader
time.sleep(3)
print(f’I am reader {i}: {message}’)
if __name__ == ‘__main__’:
# Create manager
m = Manager()

# Create multiprocessing queue
q = m.Queue()
# Create a group of parallel writers and start them
for i in range(num_writers):
Process(target=writer, args=(i,q,)).start()

# Create multiprocessing pool
p = Pool(num_readers, init_worker)
# Create a group of parallel readers and start them
# Number of readers is matching the number of writers
# However, the number of simultaneously running
# readers is constrained to the pool size
readers = []
for i in range(10):
readers.append(p.apply_async(reader, (i,q,)))

# Wait for the asynchrounous reader threads to finish
try:
[r.get() for r in readers]
except:
print(‘Interrupted’)
p.terminate()
p.join()

If you run it, you will get a result similar to this:

```

> python final.pyI am writer 8: 1580659076.783544
I am writer 3: 1580659076.783544
I am reader 0: 1580659076.783544
I am reader 1: 1580659076.783544
I am writer 7: 1580659079.7990372
I am writer 2: 1580659080.7971141
I am writer 1: 1580659081.785277
I am writer 4: 1580659082.7955923
I am reader 2: 1580659079.7990372
I am reader 3: 1580659080.7971141
I am writer 6: 1580659083.800029
I am writer 0: 1580659084.7862694
I am reader 4: 1580659081.785277
I am writer 9: 1580659085.7819643
I am writer 5: 1580659085.7919443
I am reader 5: 1580659082.7955923
I am reader 6: 1580659083.800029
I am reader 7: 1580659084.7862694
I am reader 8: 1580659085.7819643
I am reader 9: 1580659085.7919443

This post was originally published in my blog.

--

--

Software developer from California. I ❤️ programming, math and physics. Passionate about open source