The world’s leading publication for data science, AI, and ML professionals.

3 Tips to Create More Robust Pipelines with Pandas

The path to an efficient and organized workflow.

Pandas is a data analysis and manipulation library so it can get you from messy raw data to informative insights. During this process though, you are likely to do a series of data cleaning, processing, analysis operations.

The pipe function helps design an organized and robust workflow when you have a set of consecutive steps to preprocess raw data.

In this article, I will share 3 tips that are important in designing better pipelines.

Before jumping on the tips, let’s briefly mention what a pipeline is and create one. A pipeline refers to a series of operations connected using the pipe function. The functions used in the pipeline need to take a DataFrame as input and also return a DataFrame.

I have a DataFrame that contains some mock data:

import numpy as np
import pandas as pd
df = pd.read_csv("sample_dataset.csv")
df.head()

Here is the list of operations that need to be done on this DataFrame:

  • The data type of the date column is string, which needs to be converted to a proper data type.
  • There are missing values in the price column, which need to be filled with the previous price.
  • There are some outliers in the sales quantity column, which need to be removed.

Our pipeline contains 3 steps. We first define the functions for the tasks above.

def handle_dtypes(df):
    df["date"] = df["date"].astype("datetime64[ns]")
    return df
def fill_missing_prices(df):
    df["price"].fillna(method="ffill", inplace=True)
    return df
def remove_outliers(df):
    return df[df["sales_qty"] <= 2000].reset_index(drop=True)

And, here is the pipeline:

df_processed = (df.
                 pipe(handle_dtypes).
                 pipe(fill_missing_prices).
                 pipe(remove_outliers))

The same operation can be done by applying these functions separately. However, the pipe function offers a structured and organized way for combining several functions into a single operation.

Depending on the raw data and the tasks, the preprocessing may include more steps. We can add as many steps as needed using the pipe function. As the number of steps increase, the syntax becomes cleaner with the pipe function compared to executing functions separately.

We now have a functional pipeline so we can start on the tips.


1. Start the pipeline exclusively

In the following pipeline, we assign the modified DataFrame to another variable called "df_processed".

df_processed = (df.
                 pipe(handle_dtypes).
                 pipe(fill_missing_prices).
                 pipe(remove_outliers))

We may assume that the original DataFrame, df, would remain unchanged. However, it is not the case. Even if we assign the output of the pipeline to another variable, the original DataFrame is also updated.

This is not a good practice as we usually want to keep the raw data available to us. The solution is to start the pipeline with an exclusive starting step, which just copies the original DataFrame.

This step can be done using the following function.

def start_pipeline(df):
    return df.copy()

Let’s also update the pipeline accordingly.

df_processed = (df.
                 pipe(start_pipeline).
                 pipe(handle_dtypes).
                 pipe(fill_missing_prices).
                 pipe(remove_outliers))

Now whatever we do in the pipeline, the original DataFrame remains unchanged.


2. Adding arguments

Arguments add more functionality and flexibility to functions. We might have functions with arguments in a pipeline.

The cool thing is that these arguments can be accessed inside the pipeline. We can use them as an argument of the pipe function.

To demonstrate this case, let’s make the remove outlier function a little more flexible by making the threshold to detect outliers as an argument.

def remove_outliers(df, threshold=2000):
    return df[df["sales_qty"] <= threshold].reset_index(drop=True)

The default value is 2000 so if we don’t use this argument in the pipeline, the outlier threshold will be 2000.

We can control the threshold value in the pipeline as follows:

df_processed = (df.
                 pipe(start_pipeline).
                 pipe(handle_dtypes).
                 pipe(fill_missing_prices).
                 pipe(remove_outliers, threshold=1500))

3. Logging

We have a pipeline that consists of 4 steps. Depending on the raw data and the task at hand, we may need to create pipelines that have several more steps.

In such workflows, it is important to keep track of what happens at each step so it will be easier to debug in case something goes wrong.

We can achieve this by logging some information after each step. In our pipeline, the size of the DataFrame tells us if an unexpected thing happened.

Let’s print the size of the DataFrame after each step is applied in the pipeline. Since the steps are functions, we can use a Python decorator for this task.

A decorator is a function that takes another function and extends its behavior. The base function is not modified. The decorator wraps it and adds additional functionality.

Here is the decorator we will use on the functions in the pipeline.

from functools import wraps
def logging(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        print(f"The size after {func.__name__} is {result.shape}.")
        return result
    return wrapper

We will "decorate" the functions used in the pipeline as follows:

@logging
def start_pipeline(df):
    return df.copy()
@logging
def handle_dtypes(df):
    df["date"] = df["date"].astype("datetime64[ns]")
    return df
@logging
def fill_missing_prices(df):
    df["price"].fillna(method="ffill", inplace=True)
    return df
@logging
def remove_outliers(df, threshold=2000):
    return df[df["sales_qty"] <= threshold].reset_index(drop=True)

Let’s rerun the pipeline.

df_processed = (df.
                 pipe(start_pipeline).
                 pipe(handle_dtypes).
                 pipe(fill_missing_prices).
                 pipe(remove_outliers, threshold=1500))
# output
The size after start_pipeline is (1000, 3).
The size after handle_dtypes is (1000, 3).
The size after fill_missing_prices is (1000, 3).
The size after remove_outliers is (997, 3).

We now have an output that informs us about the process in the pipeline. You can customize the logging function and add some other functionalities such as measuring the time a function takes to execute.


You can become a Medium member to unlock full access to my writing, plus the rest of Medium. If you already are, don’t forget to subscribe if you’d like to get an email whenever I publish a new article.


Conclusion

Pipelines are great for organizing data cleaning and processing workflows. The example we did in this article seems to be easy to handle with applying functions separately. However, consider we have more than 10 steps to apply to the raw data. Handling them with separate functions is kind of messy and tedious to debug compared to using a pipeline.


Thank you for reading. Please let me know if you have any feedback.


Related Articles