The world’s leading publication for data science, AI, and ML professionals.

Customizing Scikit-Learn Pipelines: Write Your Own Transformer

How to use Pipelines and add custom-made transformers to processing flow

Photo by seo yeon Lee on Unsplash
Photo by seo yeon Lee on Unsplash

Looking for a way to keep your ML flow organised, while maintaining flexibility in your processing flow? Want to work with pipelines while incorporating unique stages to your data processing? This article is a simple step-by-step guide on how to use Scikit-Learn pipelines and how to add custom-made transformers to your Pipeline.

Why Pipelines?

If you have been working as a data scientist for long enough, you have probably heard about Skicit-Learn Pipelines. You may have encountered them after working on a messy research project and ending up with a giant notebook full of processing steps and various transformations, not sure which steps and parameters were applied in that final successful attempt that gave good results. Otherwise you must have come across them if you ever had a chance to deploy a model in Production.

In short, a pipeline is an object made for data scientists who want their flow of data processing and modeling to be well organised and easily applied to new data. Even the most professional data scientists are human, with limited memory and imperfect organisation skills. Luckily, we have pipelines to help us maintain order, replicability, and… our sanity.

The first part of this post is a short intro on what pipelines are and how to use them. If you are already familiar with pipelines, dig into the second part, where I discuss pipeline customisation.

A very short introduction to pipelines

A pipeline is a list of sequential transformations, followed by a Scikit-Learn estimator object (i.e. an ML model). The pipeline gives us a structured framework for applying transformations to the data and ultimately running our model. It clearly outlines which processing steps we chose to apply, their order, and the exact parameters we applied. it enforces us to carry out the exact same processing on all existing data samples, providing a clear and replicable workflow. Importantly, it enables us to later run the exact same processing steps on new samples. This last point is crucial whenever we want to apply our model to new data – whether to train our model on a test-set after training it using the train-set, or to process new data points and run the model in Production.

How to apply a pipeline?

For this post we will follow an example of a simple classification pipeline: we would like to identify individuals at high risk to develop a certain disease in the next year, based on personal and health-related information.

We will use a toy dataset including a few relevant features (this is an artificial dataset which I created for demonstration purposes only).

Let’s load the data and take a look at the first patients:

import pandas as pd
train_df = pd.read_csv('toy_data.csv', index_col = 0)
test_df = pd.read_csv('toy_data_test.csv', index_col = 0)
train_df.head()

Our preprocessing will include imputation of missing values and standard scaling. |Next, we will run a RandomForestClassifier estimator.

The code below depicts the basic usage of a pipeline. First, we import the necessary packages. Next, we define the steps of the pipeline: we do this by providing a list of tuples to the pipeline object, where each tuple consists of the step name and the transformer/ estimator object to be applied.

# import relevant packeges
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
# define our pipeline
pipe = Pipeline([('imputer', SimpleImputer()),('scaler', StandardScaler()), ('RF', RandomForestClassifier())])

We then fit the Pipeline to the train data and predict the outcome of our test data. During the fitting stage, the necessary parameters of each step are saved, creating a list of transformers which "remember" exactly which transformations to apply and which values to use, followed by a trained model.

Finally, we apply the full pipeline to new data using the predict() method. This runs the transformations on the data and predicts the outcome using the estimator.

X_train = train_df.drop(columns = ['High_risk'])
y_train = train_df['High_risk']
# fit and predict
pipe.fit (X_train, y_train)
pipe.predict (X_test)

If we want to fit the model and get the predicted values for the train set in a single step, we can also use the combined method:

pipe.fit_predict(X_train, y_train)

Customize your pipeline by writing your own transformer

As we already saw, a pipeline is simply a sequence of transformers followed by an estimator, meaning that we can mix and match various processing stages using built-in Scikit-Learn transformers (e.g. SimpleImputer, StandardScaler, etc).

But what if we want to add a specific processing step which is not one of the usual suspects for data processing?

In this example we are trying to identify patients at high risk of developing a certain disease in the upcoming year, based on personal and health-related features. In the previous section we created a pipeline which imputed missing values, scaled the data, and finally applied a Random Forest classifier.

However, after looking at the full dataset we realise that one of the features – age – has some negative or suspiciously high values:

After some investigation we discover that the age field is added manually and sometimes contains errors. Unfortunately age is an important feature in our model, so we don’t want to leave it out. We decide (for this example only…) to replace improbable values by the mean age value. Fortunately, we can do this by writing a transformer and setting it in its appropriate place within the pipeline.

Here we will write and add a custom-made transformer: AgeImputer. Our new pipeline will now include a new step before the imputer and the scaler:

pipe = Pipeline([('age_imputer', AgeImputer()),('imputer', SimpleImputer()),('scaler', StandardScaler()), ('RF', RandomForestClassifier())])

How to write a transformer?

Let’s start by looking into the structure of a transformer and its methods.

A transformer is a python class. For any transformer to be compatible with Scikit-Learn, it is expected to consist of certain methods: fit(), transform(), fit_transform(), get_params() and set_params(). The method fit() fits the pipeline; transform() applies the transformation; and the combined fit_transform() method fits and then applies the transformation to the same dataset.

Python classes can conveniently inherit functionality from other classes. More specifically, our transformer can inherit some of these methods from other classes, which means that we don’t have to write them ourselves.

The get_params() and set_params() methods are inherited from the class BaseEstimator. The fit_transform() method is inherited from the TransformerMixin class. This makes our life easier because it means that we only have to implement the fit() and transform() methods in our code, while the rest of the magic will happen on its own.

The code below illustrates implementation of the fit() and transform() methods of the new ImputeAge transformer described above. Remember, we want our transformer to "remember" the age mean and then replace impossible values with this value. The init() method (also called the constructor) will initiate an instance of the transformer, with the maximum allowed age as an input. The fit() method will compute and save the mean age value (rounded to match the integer format of age in the data), while the transform() method will use the saved mean age value to apply the transformation to the data.

# import packages
from sklearn.base import BaseEstimator, TransformerMixin
# define the transformer
class AgeImputer(BaseEstimator, TransformerMixin):

    def __init__(self, max_age):
        print('Initialising transformer...')
        self.max_age = max_age

    def fit(self, X, y = None):
        self.mean_age = round(X['Age'].mean())
        return self

    def transform(self, X):
        print ('replacing impossible age values')
        X.loc[(X['age'] > self.max_age) 
              |  (X['age'] < 0), "age"] = self.mean_age
        return X

If we wish to see the outcome of our transformation we can apply this specific step of the pipeline and view the transformed data:

age_scaled = pipe[0].fit_transform(X_train)
age_imputed

As expected, the impossible values were replaced by the average age based on the train-set.

Once we have written our transformer and added it to the pipeline, we can proceed to applying the full pipeline to our data normally.

pipe.fit(X_train, y_train)
pipe.predict(X_test)

Spice it up with more complex transformers

The example above depicts a simplified version of reality, where we only wanted to add a small change to an existing pipeline. In real life we might want to add several stages to our pipeline, or sometimes even replace the entire preprocessing flow of a pipeline by a custom made preprocessing transformer. In such cases, our new transformer class might have additional methods for various processing stages that will be applied to our data, in addition to the fit() and transform() methods. These methods will be used within the fit() and transform() methods for various computations and data processing.

But how do we decide which functionalities belong in the fit() method and which belong in the transform() method?

As a general guideline, the fit method computes and saves any information we might need for further computations, whereas the transform method uses the outcome of these computations to change the data. I like to go over the transformation stages one by one and imagine that I am applying them to a new sample. I add each processing stage to the transform method, and then I ask myself the following questions:

  1. Does this stage require any information from the original data? Examples for such information include mean values, standard deviations, column names, among others. If the answer is yes, the underlying computation that is necessary belongs in the fit() method, and the processing stage itself belongs in the transform() method. This was the case in the simple ImputeAge() transformer, where we computed the mean value in the fit() method and used it to change the data in the transform() method.

  2. Is this processing stage in itself required for extracting information that will be needed at a later processing stage? For instance, I might theoretically have an additional stage downstream which requires the standard deviation of each variable. Assuming I want the standard deviation to be computed on the imputed data, I will have to compute and save the std values of the transformed dataframe. In that case, I will include the processing stage in the transform() method as well as the fit() method, but unlike the transform() method, the fit() method will not return the transformed data. In other words, the fit() method can apply transformations to the data if necessary for internal purposes, as long as it doesn’t return the altered dataset.

Ultimately, the fit() method will sequentially perform all the necessary computations and save their results, and the transform() method will sequentially apply all processing stages to the data and return the transformed data.

That’s it!

To conclude…

We started off by applying a pipeline using ready made transformers. We then covered the structure of transformers and learned how to write a custom-made transformer and add it to our pipeline. Finally, we went over the basic rules that determine the logic behind the "fit" and "transform" methods of the transformer.

In case you haven’t started using pipelines yet, I hope I convinced you that pipelines are your friends, that they help keep your ML projects organised, less error-prone, replicable, and easy to apply to new data.

If you found this article helpful, or if you have any feedback, I would love to read it in the comments!


Related Articles