Python is often criticized for being among the slowest programming languages. While that claim does hold some weight, it’s vital to point out that Python is often the first programming language newcomers learn. Hence, most of the code is highly unoptimized.
But Python does have a couple of tricks up its sleeve. Taking advantage of concurrent function execution is stupidly simple to implement, yet it can reduce the runtime of your data pipelines tenfold. Instead of hours, it’ll take minutes. All for free.
Today you’ll see how Concurrency works in Python, and you’ll also learn how to deal with exception handling, custom callbacks, and rate limiting. Let’s dig in!
JSON Placeholder – Your Data Source for the Day
The first order of business is configuring a data source. I wanted to avoid something proprietary and something that would take ages to set up. JSON Placeholder – a free REST API service – is the perfect candidate.

To be more precise, you’ll use the /posts/<post-id>
endpoint that returns a simple JSON object:

I hear you saying – But making an API request will take a split second to finish! That’s true, but there’s always a way to slow it down.
Default Approach – Run Away and Never Look Back
Since a simple API call takes a split second to run, let’s make it slower to simulate a typical task in your data pipeline. The get_post(id: int)
function will:
- Check for a valid
post_id
— up to 100 is considered valid - Sleep for a random number of seconds between 1 and 10
- Make an API request to get the post, raise an error if it occurs, or return the response otherwise
The sleeping part makes sure the function takes a while to finish:
Python">import time
import random
import requests
def get_post(post_id: int) -> dict:
# Value check - Posts on the API only go up to ID of 100
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
# API URL
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
# Sleep to imitate a long-running process
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
# Fetch the data and return it
r = requests.get(url)
r.raise_for_status()
result = r.json()
# To indicate how much time fetching took
result["fetch_time"] = time_to_sleep
# Remove the longest key-value pair for formatting reasons
del result["body"]
return result
if __name__ == "__main__":
print(get_post(1))

Time to sleep is completely random, so you’re likely to get a different result.
Things get painfully slow when you have to run this function multiple times. Python newcomers often opt for a loop, similar to the example below:
import time
import random
import requests
from datetime import datetime
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
r = requests.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
if __name__ == "__main__":
# Measure the time
time_start = datetime.now()
print("Starting to fetch posts...n")
# Simple iteration
for post_id in range(1, 11):
post = get_post(post_id)
print(post)
# Print total duration
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

53 seconds! That’s the problem with sequential execution. Luckily for you, concurrency can help.
Say Hello to Your New Best Friend – ThreadPoolExecutor
Instead of a traditional loop, you can use the ThreadPoolExecutor
class, which is a high-level interface for running functions asynchronously using threads. If your machine has 8 cores, this class will run the function on 12 threads by default (number of cores + 4).
The ThreadPoolExecutor
class is easy to use, and it manages a pool of worker threads for you. No manual intervention is needed.
Let’s see how it works in practice.
The Simplest Way to Run Tasks Concurrently in Python
You’ll want to import ThreadPoolExecutor
and the as_completed()
function. The custom get_post()
function remains unchanged.
The real magic happens below.
Essentially, you’re creating a new ThreadPoolExecutor
through the context manager syntax (the most common approach). Inside, you’re using the submit()
method to add tasks to the executor. The first parameter of this method is your function name, followed by its parameter values. You can dynamically iterate over a range of values for post_id
using Python’s list comprehension.
A Future
object is returned by the submit()
function.
The as_completed()
function will extract and print the result as individual threads finish with execution:
import time
import random
import requests
from datetime import datetime
# New imports
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
r = requests.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
if __name__ == "__main__":
time_start = datetime.now()
print("Starting to fetch posts...n")
# Run post fetching concurrently
with ThreadPoolExecutor() as tpe:
# Submit tasks and get future objects
futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
# Process task results
for future in as_completed(futures):
# Get and display the result
result = future.result()
print(result)
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

We’re down to 10 seconds from 53.
Why 10? Because the longest running call to get_post()
decided to sleep for 10 seconds. My machine has 12 CPU cores (16 ThreadPoolExecutor
workers), meaning all tasks were submitted to the executor at the same time.
Scaling Things Up
Let’s now see what happens if you decide to concurrently run more tasks than you have workers available. Only a single line of code was changed, as indicated by the comment above:
import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
r = requests.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
if __name__ == "__main__":
time_start = datetime.now()
print("Starting to fetch posts...n")
with ThreadPoolExecutor() as tpe:
# Submit tasks and get future objects - NOW 100 POSTS IN TOTAL
futures = [tpe.submit(get_post, post_id) for post_id in range(1, 101)]
for future in as_completed(futures):
result = future.result()
print(result)
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Long story short, ThreadPoolExecutor
will start by running (num cores + 4) tasks at the time, and proceed with others as workers become available.
How to Handle Failure
Sometimes, the function you want to run concurrently will fail. Inside the for future in as_completed(futures):
block, you can add a try/except
block to implement exception handling.
To demonstrate, try submitting futures for post_id
s up to 150 – as the get_post()
function will raise an error for any post_id
above 100:
import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
r = requests.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
if __name__ == "__main__":
time_start = datetime.now()
print("Starting to fetch posts...n")
with ThreadPoolExecutor() as tpe:
# Submit tasks and get future objects - NOW 150 POSTS IN TOTAL - 50 WILL FAIL
futures = [tpe.submit(get_post, post_id) for post_id in range(1, 151)]
# Process task results
for future in as_completed(futures):
# Your typical try/except block
try:
result = future.result()
print(result)
except Exception as e:
print(f"Exception raised: {str(e)}")
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

You can see that exception handling works like a charm. The order of execution is random, but that’s irrelevant.
Want to Do Custom Stuff? Add Callbacks
In case you don’t want to cram a bunch of code into the for future in as_completed(futures):
block, you can call add_done_callback()
to call your custom Python function. This function will have access to the Future
object.
The following code snippet calls future_callback_fn()
when execution on an individual thread finishes:
import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
r = requests.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
def future_callback_fn(future):
print(f"[{datetime.now()}] Custom future callback function!")
# You have access to the future object
print(future.result())
if __name__ == "__main__":
time_start = datetime.now()
print("Starting to fetch posts...n")
with ThreadPoolExecutor() as tpe:
futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
for future in as_completed(futures):
# Custom callback
future.add_done_callback(future_callback_fn)
# Print total duration
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

This is a great way to keep the code inside __main__
short and sweet.
Rate Limiting – How to Get Around That Pesky HTTP 429 Error
Every data professional works with REST APIs. While calling their endpoints concurrently is a good way to reduce the overall runtime, it can result in aHTTP 429 Too Many Request
status.
The reason is simple – the API owner doesn’t want you making thousands of requests every second for the sake of performance. Or maybe, they’re restricting the traffic volume based on your subscription tear.
Whatever the case might be, an easy way around it is to install the requests-ratelimiter
library and limit how many requests can be made per day, hour, minute, or second.
The example below demonstrates how to set a limit to 2 requests per second:
import time
import random
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# New import
from requests_ratelimiter import LimiterSession
# Limit to max 2 calls per second
request_session = LimiterSession(per_second=2)
def get_post(post_id: int) -> dict:
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
time_to_sleep = random.randint(1, 10)
time.sleep(time_to_sleep)
# Use the request_session now
r = request_session.get(url)
r.raise_for_status()
result = r.json()
result["fetch_time"] = time_to_sleep
del result["body"]
return result
if __name__ == "__main__":
time_start = datetime.now()
print("Starting to fetch posts...n")
# Everything here stays the same
with ThreadPoolExecutor() as tpe:
futures = [tpe.submit(get_post, post_id) for post_id in range(1, 16)]
for future in as_completed(futures):
result = future.result()
print(result)
time_end = datetime.now()
print(f"nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

The code still works flawlessly, but might take more time to finish. Still, it will be miles ahead of any sequential implementation.
Summing up Python Concurrency
Concurrency in Python requires a couple of additional lines of code, but is worth it 9/10 times. If your data pipeline fetches data from multiple sources, and the result of one fetch isn’t used as an input to the other, concurrency is a guaranteed way to speed things up.
Just imagine if you were building a web scraper that parses multiple pages with identical structures. Doing so concurrently would have a massive positive impact on the overall runtime.
Or if you’re downloading data from cloud object storage. Simply write a function to download all files in a folder, and then run it concurrently on the root folder contents.
The applications are endless, but it does require a bit of thinking ahead.
Read next:
Python One Billion Row Challenge – From 10 Minutes to 4 Seconds