The world’s leading publication for data science, AI, and ML professionals.

Python Concurrency – A Brain-Friendly Guide for Data Professionals

Moving data around can be slow. Here's how you can squeeze every bit of performance optimization out of Python.

Photo by Matthew Brodeur on Unsplash
Photo by Matthew Brodeur on Unsplash

Python is often criticized for being among the slowest programming languages. While that claim does hold some weight, it’s vital to point out that Python is often the first programming language newcomers learn. Hence, most of the code is highly unoptimized.

But Python does have a couple of tricks up its sleeve. Taking advantage of concurrent function execution is stupidly simple to implement, yet it can reduce the runtime of your data pipelines tenfold. Instead of hours, it’ll take minutes. All for free.

Today you’ll see how Concurrency works in Python, and you’ll also learn how to deal with exception handling, custom callbacks, and rate limiting. Let’s dig in!


JSON Placeholder – Your Data Source for the Day

The first order of business is configuring a data source. I wanted to avoid something proprietary and something that would take ages to set up. JSON Placeholder – a free REST API service – is the perfect candidate.

Image 1 - Available endpoints (image by author)
Image 1 – Available endpoints (image by author)

To be more precise, you’ll use the /posts/<post-id> endpoint that returns a simple JSON object:

Image 2 - Sample API response (image by author)
Image 2 – Sample API response (image by author)

I hear you saying – But making an API request will take a split second to finish! That’s true, but there’s always a way to slow it down.

Default Approach – Run Away and Never Look Back

Since a simple API call takes a split second to run, let’s make it slower to simulate a typical task in your data pipeline. The get_post(id: int) function will:

  • Check for a valid post_id— up to 100 is considered valid
  • Sleep for a random number of seconds between 1 and 10
  • Make an API request to get the post, raise an error if it occurs, or return the response otherwise

The sleeping part makes sure the function takes a while to finish:

Python">import time
import random
import requests

def get_post(post_id: int) -> dict:
    # Value check - Posts on the API only go up to ID of 100
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    # API URL
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    # Sleep to imitate a long-running process
    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    # Fetch the data and return it
    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    # To indicate how much time fetching took
    result["fetch_time"] = time_to_sleep
    # Remove the longest key-value pair for formatting reasons
    del result["body"]
    return result

if __name__ == "__main__":
    print(get_post(1))
Image 3 - Default approach fetch results (image by author)
Image 3 – Default approach fetch results (image by author)

Time to sleep is completely random, so you’re likely to get a different result.

Things get painfully slow when you have to run this function multiple times. Python newcomers often opt for a loop, similar to the example below:

import time
import random
import requests
from datetime import datetime

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

if __name__ == "__main__":
    # Measure the time
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    # Simple iteration
    for post_id in range(1, 11):
        post = get_post(post_id)
        print(post)

    # Print total duration
    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 4 - Default approach fetch results (2) (image by author)
Image 4 – Default approach fetch results (2) (image by author)

53 seconds! That’s the problem with sequential execution. Luckily for you, concurrency can help.

Say Hello to Your New Best Friend – ThreadPoolExecutor

Instead of a traditional loop, you can use the ThreadPoolExecutor class, which is a high-level interface for running functions asynchronously using threads. If your machine has 8 cores, this class will run the function on 12 threads by default (number of cores + 4).

The ThreadPoolExecutor class is easy to use, and it manages a pool of worker threads for you. No manual intervention is needed.

Let’s see how it works in practice.

The Simplest Way to Run Tasks Concurrently in Python

You’ll want to import ThreadPoolExecutor and the as_completed() function. The custom get_post() function remains unchanged.

The real magic happens below.

Essentially, you’re creating a new ThreadPoolExecutor through the context manager syntax (the most common approach). Inside, you’re using the submit() method to add tasks to the executor. The first parameter of this method is your function name, followed by its parameter values. You can dynamically iterate over a range of values for post_id using Python’s list comprehension.

A Future object is returned by the submit() function.

The as_completed() function will extract and print the result as individual threads finish with execution:

import time
import random
import requests
from datetime import datetime
# New imports
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    # Run post fetching concurrently
    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
        # Process task results
        for future in as_completed(futures):
            # Get and display the result
            result = future.result()
            print(result)

    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 5 - Python concurrent execution (image by author)
Image 5 – Python concurrent execution (image by author)

We’re down to 10 seconds from 53.

Why 10? Because the longest running call to get_post() decided to sleep for 10 seconds. My machine has 12 CPU cores (16 ThreadPoolExecutor workers), meaning all tasks were submitted to the executor at the same time.

Scaling Things Up

Let’s now see what happens if you decide to concurrently run more tasks than you have workers available. Only a single line of code was changed, as indicated by the comment above:

import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects - NOW 100 POSTS IN TOTAL
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 101)]
        for future in as_completed(futures):
            result = future.result()
            print(result)

    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 6 - Python concurrent execution (2) (image by author)
Image 6 – Python concurrent execution (2) (image by author)

Long story short, ThreadPoolExecutor will start by running (num cores + 4) tasks at the time, and proceed with others as workers become available.

How to Handle Failure

Sometimes, the function you want to run concurrently will fail. Inside the for future in as_completed(futures): block, you can add a try/except block to implement exception handling.

To demonstrate, try submitting futures for post_ids up to 150 – as the get_post() function will raise an error for any post_id above 100:

import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects - NOW 150 POSTS IN TOTAL - 50 WILL FAIL
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 151)]
        # Process task results
        for future in as_completed(futures):
            # Your typical try/except block
            try:
                result = future.result()
                print(result)
            except Exception as e:
                print(f"Exception raised: {str(e)}")

    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 7— Python concurrent execution (3) (image by author)
Image 7— Python concurrent execution (3) (image by author)

You can see that exception handling works like a charm. The order of execution is random, but that’s irrelevant.

Want to Do Custom Stuff? Add Callbacks

In case you don’t want to cram a bunch of code into the for future in as_completed(futures): block, you can call add_done_callback() to call your custom Python function. This function will have access to the Future object.

The following code snippet calls future_callback_fn() when execution on an individual thread finishes:

import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

def future_callback_fn(future):
    print(f"[{datetime.now()}] Custom future callback function!")
    # You have access to the future object
    print(future.result())

if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    with ThreadPoolExecutor() as tpe:
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
        for future in as_completed(futures):
            # Custom callback
            future.add_done_callback(future_callback_fn)

    # Print total duration
    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 8 - Python concurrent execution (4) (image by author)
Image 8 – Python concurrent execution (4) (image by author)

This is a great way to keep the code inside __main__ short and sweet.

Rate Limiting – How to Get Around That Pesky HTTP 429 Error

Every data professional works with REST APIs. While calling their endpoints concurrently is a good way to reduce the overall runtime, it can result in aHTTP 429 Too Many Request status.

The reason is simple – the API owner doesn’t want you making thousands of requests every second for the sake of performance. Or maybe, they’re restricting the traffic volume based on your subscription tear.

Whatever the case might be, an easy way around it is to install the requests-ratelimiter library and limit how many requests can be made per day, hour, minute, or second.

The example below demonstrates how to set a limit to 2 requests per second:

import time
import random
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# New import
from requests_ratelimiter import LimiterSession

# Limit to max 2 calls per second
request_session = LimiterSession(per_second=2)

def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    # Use the request_session now
    r = request_session.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result

if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...n")

    # Everything here stays the same
    with ThreadPoolExecutor() as tpe:
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 16)]
        for future in as_completed(futures):
            result = future.result()
            print(result)

    time_end = datetime.now()
    print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")
Image 9— Python concurrent execution (5) (image by author)
Image 9— Python concurrent execution (5) (image by author)

The code still works flawlessly, but might take more time to finish. Still, it will be miles ahead of any sequential implementation.


Summing up Python Concurrency

Concurrency in Python requires a couple of additional lines of code, but is worth it 9/10 times. If your data pipeline fetches data from multiple sources, and the result of one fetch isn’t used as an input to the other, concurrency is a guaranteed way to speed things up.

Just imagine if you were building a web scraper that parses multiple pages with identical structures. Doing so concurrently would have a massive positive impact on the overall runtime.

Or if you’re downloading data from cloud object storage. Simply write a function to download all files in a folder, and then run it concurrently on the root folder contents.

The applications are endless, but it does require a bit of thinking ahead.

Read next:

Python One Billion Row Challenge – From 10 Minutes to 4 Seconds


Related Articles

Some areas of this page may shift around if you resize the browser window. Be sure to check heading and document order.