Object-Oriented Machine Learning Pipeline with mlflow for Pandas and Koalas DataFrames

End-to-end process of developing Spark-enabled machine learning pipeline in Python using Pandas, Koalas, scikit-learn, and mlflow

Yuefeng Zhang, PhD
Towards Data Science

--

In the article Python Data Preprocessing Using Pandas DataFrame, Spark DataFrame, and Koalas DataFrame, I used a public dataset to evaluate and compare the basic functionality of Pandas, Spark, and Koalas DataFrames in typical data preprocessing steps for machine learning. The main advantage of Koalas is that it supports an easy-to-use API similar to Pandas on Spark.

In this article, I use a more challenging dataset, the Interview Attendance Problem for Kaggle competition to demonstrate an end-to-end process of developing an Object-Oriented machine learning pipeline in Python for both Pandas and Koalas DataFrames using Pandas, Koalas, scikit-learn, and mlflow. This is achieved by:

  • Developing a data preprocessing pipeline using Pandas DataFrame with scikit-learn pipeline API
  • Developing a data preprocessing pipeline for Spark by combining scikit-learn pipeline API with Koalas DataFrame
  • Developing a machine learning pipeline by combining scikit-learn with mlflow

The end-to-end development process is based on the Cross-industry standard process for data mining. As shown in the diagram below, it consists of six major phases:

  • Business Understanding
  • Data Understanding
  • Data Preparation
  • Modeling
  • Evaluation
  • Deployment

Figure 1: CRISP-DM process diagram (refer to source in Wikipedia)

For convenience of discussion, it is assumed that the following Python libraries have been installed on a local machine such as Mac:

The reason of using Python 3.6 is that certain functionality (e.g., deployment) of the current release of mlflow does not work with Python 3.7.

1. Business Understanding

The first phase is business understanding. The key point in this phase is to understand the business problem to be solved. As an example, the following is a brief description of the Kaggle interview attendance problem:

Given a set of questions that are asked by a recruiter while scheduling an interview with a candidate, how to use the answers to those questions from the candidate to predict whether the expected attendance will attend a scheduled interview (yes, no, or uncertain).

2. Data Understanding

Once the business problem is understood, the next step is to identify where (i.e., data sources) and how we should collect data from which a machine learning solution to the problem can be built.

The dataset for the Kaggle interview attendance problem has been collected from the recruitment industry in India by the researchers over a period of more than 2 years between September 2014 and January 2017.

This dataset is collected with labels (the column of Observed Attendance holds the labels) and thus it is suitable for supervised machine learning.

The following code imports the necessary Python libraries for all the source code in this article, and loads the dataset into a Koalas DataFrame and displays the first five rows of the DataFrame as shown in the table above.

import numpy as np
import pandas as pd
import databricks.koalas as ks
import matplotlib.pyplot as plt
import matplotlib as mpl
from datetime import datetime
import os
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.externals import joblib
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
%matplotlib inlineks_df = ks.read_csv('Interview_Attendance_Data.csv')
ks_df.head()

3. Data Preparation

The main goal of data preparation is to clean and transform a collected raw dataset into appropriate format so that the transformed data can be effectively consumed by a target machine learning model.

In the interview attendance raw dataset, the column of Name(Cand ID) contains candidate unique identifiers, which do not have much prediction power and thus can be dropped. In addition, all of the columns (i.e., columns from _c22 to _c26 for Koalas DataFrame, or columns from Unnamed: 22 to Unnamed: 26 for Pandas DataFrame) have no data and thus can safely be dropped as well.

Except for the date of interview, all of the other columns in the dataset have categorical (textual) values. In order to use machine learning to solve the problem, those categorical values must be transformed into numeric values because a machine learning model can only consume numeric data.

The column of Date of Interview should be split into day, month, and year to increase prediction power since the information of individual day, month, and year tends to be more strongly correlated with seasonable jobs compared with a string of date as a whole.

The columns of Nature of Skillset and Candidate Native location have a large number of unique entries. These will introduce a large number of new derived features after one-hot encoding. Too many features can lead to a curse of dimensionality problem in the case the size of dataset is not large enough. To alleviate such problem, the values of these two columns are redivided into a smaller number of buckets.

The above data preprocessing/transformation can be summarized as following steps:

  • Bucketing skillset
  • Bucketing candidate native location
  • Parsing interview date
  • Changing categorical values to uppercase and dropping less useful features
  • One-Hot Encoding categorical values

These steps are implemented by developing an Object-Oriented data preprocessing pipeline for both Pandas and Koalas DataFrames by combining Pandas and Koalas DataFrames with scikit-learn pipeline API (i.e., BaseEstimator, TransformerMixin, and Pipeline).

3.1 Transforming Column Values

Several data preprocessing steps share a common operation of transforming the values of a particular column in a DataFrame. But, as described in Koalas Series, a Koalas Series does not support some of the common Pandas DataFrame and Series indexing mechanisms such as df.iloc[0]. Because of this, there is no simple method of traversing and changing the values of a column in a Koalas DataFrame.

The other difficulty is that Koalas does not allow to build a new Koalas Series object from scratch and then add it as a new column in an existing Koalas DataFrame. It only allows a new Koalas Series object that is built from the existing columns of a Koalas DataFrame.

The difficulties above are avoided by defining a global function to call the apply() method of a Koalas Series object.

def transformColumn(column_values, func, func_type):    def transform_column(column_element) -> func_type:
return func(column_element)

cvalues = column_values
cvalues = cvalues.apply(transform_column)
return cvalues

3.2 Bucketing Skillset

To alleviate the curse of dimensionality issue, the transform() method of the BucketSkillset transformer class divides the unique values of the skillset column into smaller number of buckets by changing those values that appear less than 9 times as one same string value of Others.

class BucketSkillset(BaseEstimator, TransformerMixin):
def __init__(self):
self.skillset = ['JAVA/J2EE/Struts/Hibernate', 'Fresher', 'Accounting Operations', 'CDD KYC', 'Routine', 'Oracle',
'JAVA/SPRING/HIBERNATE/JSF', 'Java J2EE', 'SAS', 'Oracle Plsql', 'Java Developer',
'Lending and Liabilities', 'Banking Operations', 'Java', 'Core Java', 'Java J2ee', 'T-24 developer',
'Senior software engineer-Mednet', 'ALS Testing', 'SCCM', 'COTS Developer', 'Analytical R & D',
'Sr Automation Testing', 'Regulatory', 'Hadoop', 'testing', 'Java', 'ETL', 'Publishing']

def fit(self, X, y=None):
return self

def transform(self, X, y=None):

func = lambda x: x if x in self.skillset else 'Others'

X1 = X.copy()
cname = 'Nature of Skillset'
cvalue = X1[cname]

if type(X1) == ks.DataFrame:
cvalue = transformColumn(cvalue, func, str)
X1[cname] = cvalue
elif type(X1) == pd.DataFrame:
X2 = map(func, cvalue)
X1[cname] = pd.Series(X2)
else:
print('BucketSkillset: unsupported dataframe: {}'.format(type(X1)))
pass

return X1

3.3 Bucketing candidate native location

Similarly to bucketing skillset, to alleviate the curse of dimensionality issue, the transform() method of the BucketLocation transformer class divides the unique values of the candidate native location column into smaller number of buckets by changing those values that appear less than 12 times into one same value of Others.

class BucketLocation(BaseEstimator, TransformerMixin):
def __init__(self):
self.candidate_locations = ['Chennai', 'Hyderabad', 'Bangalore', 'Gurgaon', 'Cuttack', 'Cochin',
'Pune', 'Coimbatore', 'Allahabad', 'Noida', 'Visakapatinam', 'Nagercoil',
'Trivandrum', 'Kolkata', 'Trichy', 'Vellore']


def fit(self, X, y=None):
return self

def transform(self, X, y=None):
X1 = X.copy()

func = lambda x: x if x in self.candidate_locations else 'Others'

cname = 'Candidate Native location'
cvalue = X1[cname]
if type(X1) == ks.DataFrame:
cvalue = transformColumn(cvalue, func, str)
X1[cname] = cvalue
elif type(X1) == pd.DataFrame:
X2 = map(func, cvalue)
X1[cname] = pd.Series(X2)
else:
print('BucketLocation: unsupported dataframe: {}'.format(type(X1)))
pass

return X1

3.4 Parsing Interview Date

The values of the column of Date of Interview are messy in that various formats are used. For instance not only different delimits are used to separate day, month, and year, but also different orders of day, month, and year are followed. This is handled by the _parseDate() and transform_date() methods of the ParseInterviewDate transformer class. The overall functionality of the transform() method is to separate the interview date string into values of individual day, month, and year.

class ParseInterviewDate(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def __parseDate(self, string, delimit):
try:
if ('&' in string):
subs = tuple(string.split('&'))
string = subs[0]
except:
print ('TypeError: {}'.format(string))
return None

string = string.strip()

try:
d = datetime.strptime(string, '%d{0}%m{0}%Y'.format(delimit))
except:
try:
d = datetime.strptime(string, '%d{0}%m{0}%y'.format(delimit))
except:
try:
d = datetime.strptime(string, '%d{0}%b{0}%Y'.format(delimit))
except:
try:
d = datetime.strptime(string, '%d{0}%b{0}%y'.format(delimit))
except:
try:
d = datetime.strptime(string, '%b{0}%d{0}%Y'.format(delimit))
except:
try:
d = datetime.strptime(string, '%b{0}%d{0}%y'.format(delimit))
except:
d = None
return d

def fit(self, X, y=None):
return self

def transform(self, X, y=None):

def transform_date(ditem):
if (isinstance(ditem, str) and len(ditem) > 0):
if ('.' in ditem):
d = self.__parseDate(ditem, '.')
elif ('/' in ditem):
d = self.__parseDate(ditem, '/')
elif ('-' in ditem):
d = self.__parseDate(ditem, '-')
elif (' ' in ditem):
d = self.__parseDate(ditem, ' ')
else:
d = None

if (d is None):
return 0, 0, 0
else:
return d.day, d.month, d.year

def get_day(column_element) -> int:
try:
day, month, year = transform_date(column_element)
return int(day)
except:
return 0

def get_month(column_element) -> int:
try:
day, month, year = transform_date(column_element)
return int(month)
except:
return 0

def get_year(column_element) -> int:
try:
day, month, year = transform_date(column_element)
return int(year)
except:
return 0

def pandas_transform_date(X1):
days = []
months = []
years = []
ditems = X1['Date of Interview'].values
for ditem in ditems:
if (isinstance(ditem, str) and len(ditem) > 0):
if ('.' in ditem):
d = self.__parseDate(ditem, '.')
elif ('/' in ditem):
d = self.__parseDate(ditem, '/')
elif ('-' in ditem):
d = self.__parseDate(ditem, '-')
elif (' ' in ditem):
d = self.__parseDate(ditem, ' ')
else:
d = None

if (d is None):
# print("{}, invalid format of interview date!".format(ditem))
days.append(0) # 0 - NaN
months.append(0)
years.append(0)
else:
days.append(d.day)
months.append(d.month)
years.append(d.year)
else:
days.append(0)
months.append(0)
years.append(0)

X1['Year'] = years
X1['Month'] = months
X1['Day'] = days

return X1

X1 = X.copy()

if type(X1) == ks.DataFrame:
X1['Year'] = X1['Date of Interview']
X1['Month'] = X1['Date of Interview']
X1['Day'] = X1['Date of Interview']

func_map = {'Year' : get_year, 'Month' : get_month, 'Day' : get_day}
for cname in func_map:
cvalue = X1[cname]
cvalue = cvalue.apply(func_map[cname])
X1[cname] = cvalue
elif type(X1) == pd.DataFrame:
X1 = pandas_transform_date(X1)
else:
print('ParseInterviewDate: unsupported dataframe: {}'.format(type(X1)))
pass

return X1

3.5 Changing Categorical Values to Uppercase and Dropping Less Useful Features

The transform() method of the FeaturesUppercase transformer class is to change the values of categorical features to uppercase and at the same time drop less useful features.

class FeaturesUppercase(BaseEstimator, TransformerMixin):
def __init__(self, feature_names, drop_feature_names):
self.feature_names = feature_names
self.drop_feature_names = drop_feature_names

def fit(self, X, y=None):
return self

def transform(self, X, y=None):

func = lambda x: x.strip().upper()

X1 = X.copy()

for fname in self.feature_names:
values = X1[fname]
values = values.fillna('NaN')
if type(X1) == ks.DataFrame:
values = transformColumn(values, func, str)
elif type(X1) == pd.DataFrame:
values = map(lambda x: x.strip().upper(), values)
else:
print('FeaturesUppercase: unsupported dataframe: {}'.format(type(X1)))
X1[fname] = values

# drop less important features
X1 = X1.drop(self.drop_feature_names, axis=1)

return X1

3.6 One-Hot Encoding Categorical Values

The transform() method of the OneHotEncodeData transformer class calls the get_dummies() method of DataFrame to one-hot encode the values of categorical values.

class OneHotEncodeData(BaseEstimator, TransformerMixin):
def __init__(self):
self.one_hot_feature_names = ['Client name',
'Industry',
'Location',
'Position to be closed',
'Nature of Skillset',
'Interview Type',
'Gender',
'Candidate Current Location',
'Candidate Job Location',
'Interview Venue',
'Candidate Native location',
'Have you obtained the necessary permission to start at the required time',
'Hope there will be no unscheduled meetings',
'Can I Call you three hours before the interview and follow up on your attendance for the interview',
'Can I have an alternative number/ desk number. I assure you that I will not trouble you too much',
'Have you taken a printout of your updated resume. Have you read the JD and understood the same',
'Are you clear with the venue details and the landmark.',
'Has the call letter been shared',
'Marital Status']
self.label_encoders = None
self.one_hot_encoders = None

def fit(self, X, y=None):
return self

def transform(self, X, y=None):
X1 = X.copy()
if type(X1) == ks.DataFrame:
X1 = ks.get_dummies(X1)
elif type(X1) == pd.DataFrame:
X1 = pd.get_dummies(X1)
else:
print('OneHotEncodeData: unsupported dataframe: {}'.format(type(X1)))
pass

return X1

3.7 Combining Transformers into Pipeline

All of the data preprocessing transformers are combined into a scikit-learn pipeline as follows in the PreprocessData() method of the PredictInterview class (see Section 4.3 for details). The fit() and transform() methods of these transformers will be executed sequentially once the fit_transform() method of the pipeline object is called.

self.pipeline = Pipeline([
('bucket_skillset', BucketSkillset()),
('bucket_location', BucketLocation()),
('parse_interview_date', ParseInterviewDate()),
('features_to_uppercase', FeaturesUppercase(self.feature_names, self.drop_feature_names)),
('one_hot_encoder', self.oneHotEncoder)
])

4. Modeling

Once the dataset has been prepared, the next step is modeling. The main goals of modeling include:

  • Identify machine learning model
  • Train machine learning model
  • Tune the hyper-parameters of machine learning model

4.1 Identifying Machine Learning Model

There are three major high-level types of machine learning and deep learning algorithms/models:

  • supervised machine learning and deep learning
  • unsupervised machine learning and deep learning
  • reinforcement learning

Supervised machine learning and deep learning can be divided into subtypes such as regression and classification. Each subtype includes various machine learning and deep learning algorithms/models. For instance, supervised machine learning classification models include Decision Tree classifier, Random Forest classifier, GBM classifier, etc.

Generally speaking, given a business problem, there are many different types of models that can be used as possible solutions. These different models need to be compared to identify the most promising one as the solution to the target business problem. Thus model identification can not be done in isolation. It depends on model training and evaluation/comparison of model performance metrics.

In this article we simply select the scikit-learn RandomForestClassifier model for demonstration purpose.

4.2 Training model and tuning hyper-parameters

Once a model (e.g., RandomForestClassifier) is identified, typically there are multiple hyper-parameters to be tuned. A hyper-parameter is a parameter that needs to be set before a model training can begin and such hyper-parameter value does not change during model training. For example, the Random Forest Classifier has multiple hyper-parameters such as number of estimators, max depth, etc.

The sciket-learn GridSearchCV is a popular library for searching the best combination of hyper-parameters of a given model by automatically executing an instance of the model many times. Each execution corresponds to a unique combination of the selected hyper-parameter values. The GridSearch class is to use this library to find the best combination of number of estimators and max depth:

class GridSearch(object):
def __init__(self, cv=10):
self.grid_param = [
{'n_estimators': range(68,69), # range(60, 70)
'max_depth' : range(8,9)} # range(5, 10)}
]
self.cv = cv
self.scoring_function = make_scorer(f1_score, greater_is_better=True)
self.gridSearch = None

def fit(self, X, y):
rfc = RandomForestClassifier()
self.gridSearchCV = GridSearchCV(rfc, self.grid_param, cv=self.cv, scoring=self.scoring_function)
self.gridSearchCV.fit(X, y)
return self.gridSearchCV.best_estimator_

4.3 Tracking Model Hyper-Parameters and Performance Metrics

One designed functionality of mlflow is to track and compare the hyper-parameters and performance metrics of different model executions.

The method of mlFlow() of the PredictInterview class is to train a model, use the trained model to predict results, obtain various model performance metrics, and then call the mlflow API to track both the hyper-parameters and performance metrics, and at the same time log a trained model into a file for later usage such as deployment.

def mlFlow(self):
np.random.seed(40)
with mlflow.start_run():
self.loadData()
self.PreprocessData()
self.trainModel()
self.predictClasses()
accuracy_score, f1_score, rmse_score, mae_score, r2_score = self.getModelMetrics()
best_params = self.gridSearch.gridSearchCV.best_params_ mlflow.log_param("n_estimators", best_params["n_estimators"])
mlflow.log_param("max_depth", best_params["max_depth"])
mlflow.log_metric("rmse", rmse_score)
mlflow.log_metric("r2", r2_score)
mlflow.log_metric("mae", mae_score)
mlflow.log_metric("accuracy", accuracy_score)
mlflow.log_metric("f1", f1_score)
mlflow.sklearn.log_model(self.rfc, "random_forest_model")

A Jupyter notebook of the PredictInterview class below and all the other pieces of source code in this article are available in Github [6].

class PredictInterview(object):
def __init__(self, use_koalas=True):
self.use_koalas = use_koalas
self.dataset_file_name = 'Interview_Attendance_Data.csv'
self.feature_names = ['Date of Interview',
'Client name',
'Industry',
'Location',
'Position to be closed',
'Nature of Skillset',
'Interview Type',
'Gender',
'Candidate Current Location',
'Candidate Job Location',
'Interview Venue',
'Candidate Native location',
'Have you obtained the necessary permission to start at the required time',
'Hope there will be no unscheduled meetings',
'Can I Call you three hours before the interview and follow up on your attendance for the interview',
'Can I have an alternative number/ desk number. I assure you that I will not trouble you too much',
'Have you taken a printout of your updated resume. Have you read the JD and understood the same',
'Are you clear with the venue details and the landmark.',
'Has the call letter been shared', 'Marital Status']

if self.use_koalas:
self.drop_feature_names = [
'Name(Cand ID)',
'Date of Interview',
'_c22',
'_c23',
'_c24',
'_c25',
'_c26']
else: # use Pandas
self.drop_feature_names = [
'Unnamed: 22',
'Unnamed: 23',
'Unnamed: 24',
'Unnamed: 25',
'Unnamed: 26']

self.dataset = None
self.rfc = None
self.gridSearch = None
self.X_train = None
self.y_train = None
self.X_test = None
self.y_test = None
self.y_pred = None
self.X_clean = None
self.y_clean = None
self.X_train_encoded = None
self.X_test_encoded = None
self.y_train_encoded = None
self.accuracy_score = None
self.f1_score = None
self.oneHotEncoder = None
self.X_test_name_ids = None
self.pipeline = None


def loadData(self, path=None):
if (path != None):
path = os.path.join(path, self.dataset_file_name)
else:
path = self.dataset_file_name

if self.use_koalas:
dataset = ks.read_csv(path)
else:
dataset = pd.read_csv(path)

# shuffle data
self.dataset = dataset.sample(frac=1.0)

return self.dataset

def PreprocessData(self):
y = self.dataset['Observed Attendance'] # extract labels y
if self.use_koalas:
X = self.dataset.drop('Observed Attendance') # extract features X
else:
X = self.dataset.drop(['Observed Attendance'], axis=1)

self.oneHotEncoder = OneHotEncodeData()

self.pipeline = Pipeline([
('bucket_skillset', BucketSkillset()),
('bucket_location', BucketLocation()),
('parse_interview_date', ParseInterviewDate()),
('features_to_uppercase', FeaturesUppercase(self.feature_names, self.drop_feature_names)),
('one_hot_encoder', self.oneHotEncoder)
])

X_1hot = self.pipeline.fit_transform(X)

# fill up missing labels and then change labels to uppercase
y = y.fillna('NaN')

if self.use_koalas:
func = lambda x: x.strip().upper()
y_uppercase = transformColumn(y, func, str)
else:
y_uppercase = map(lambda x: x.strip().upper(), y.values)
y_uppercase = pd.Series(y_uppercase)

# separate labeled records from unlabeled records
self.X_train_encoded = X_1hot[y_uppercase != 'NAN']
self.X_test_encoded = X_1hot[y_uppercase == 'NAN']

# save Names/ID for reporting later one
self.X_test_name_ids = self.dataset['Name(Cand ID)'].loc[y_uppercase == 'NAN']

y_train = y_uppercase.loc[y_uppercase != 'NAN']

# encode labels as follows: 0 - NO, 1 - YES, NAN - NAN
if self.use_koalas:
func = lambda x: 1 if x == 'YES' else 0
y = transformColumn(y_train, func, int)
else:
y = map(lambda x: 1 if x == 'YES' else 0, y_train)
y = pd.Series(y)

self.y_train_encoded = y

self.X_clean = X_1hot
self.y_clean = y_uppercase

return None

def __splitData(self):
if self.use_koalas:
X_train_encoded = self.X_train_encoded.to_numpy()
y_train_encoded = self.y_train_encoded.to_numpy()
else:
X_train_encoded = self.X_train_encoded.values
y_train_encoded = self.y_train_encoded.values

self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X_train_encoded,
y_train_encoded,
test_size = 0.25, random_state = 0)
return (self.X_train, self.X_test, self.y_train, self.y_test)

def trainModel(self):
X_train, X_test, y_train, y_test = self.__splitData()
self.gridSearch = GridSearch()
self.rfc = self.gridSearch.fit(X_train, y_train)
return self.rfc

def predictClasses(self):
if (self.rfc is None):
print("No trained model available, please train a model first!")
return None

self.y_pred = self.rfc.predict(self.X_test)
return self.y_pred

def getModelMetrics(self):
if (self.y_test is None or self.y_pred is None):
print('Failed to get model performance metrics because y_test is null or y_pred is null!')
return None

self.accuracy_score = accuracy_score(self.y_test, self.y_pred)
self.f1_score = f1_score(self.y_test, self.y_pred)

pred = self.predictAttendanceProbability(self.X_test)[:, 1]
actual = self.y_test.astype(float)

self.rmse_score = np.sqrt(mean_squared_error(actual, pred))
self.mae_score = mean_absolute_error(actual, pred)
self.r2_score = r2_score(actual, pred)

return (self.accuracy_score, self.f1_score, self.rmse_score, self.mae_score, self.r2_score)

def predictNullAttendanceProbability(self):
y_pred = self.rfc.predict_proba(self.X_test_encoded.to_numpy())
return y_pred

def predictNullAttendanceClasses(self):
y_pred = self.rfc.predict(self.X_test_encoded.to_numpy())
return y_pred

def predictAttendanceProbability(self, X):
y_pred = self.rfc.predict_proba(X)
return y_pred

def predictAttendanceClass(self, X):
y_pred = self.rfc.predict(X)
return y_pred

def mlFlow(self):
np.random.seed(40)
with mlflow.start_run():
self.loadData()
self.PreprocessData()
self.trainModel()
self.predictClasses()
accuracy_score, f1_score, rmse_score, mae_score, r2_score = self.getModelMetrics()
best_params = self.gridSearch.gridSearchCV.best_params_ mlflow.log_param("n_estimators", best_params["n_estimators"])
mlflow.log_param("max_depth", best_params["max_depth"])
mlflow.log_metric("rmse", rmse_score)
mlflow.log_metric("r2", r2_score)
mlflow.log_metric("mae", mae_score)
mlflow.log_metric("accuracy", accuracy_score)
mlflow.log_metric("f1", f1_score)
mlflow.sklearn.log_model(self.rfc, "random_forest_model")

The code below shows how to instantiate an object of the PredictInterview class and then call its mlFlow() method.

predictInterview = PredictInterview(use_koalas=True)
predictInterview.mlFlow()

4.4 Comparing Model Hyper-Parameters and Performance Metrics

Once the hyper-parameters and performance metrics of a model have been tracked in mlflow, we can use a terminal or Jupyter notebook to start the mlflow UI (User Interface) as follows to view the history of model executions:

!mlflow ui # for jupyter notebook

Assuming that the mlflow UI starts on a local machine, the following IP address and port number can be used to view the results in a Web browser:

http://127.0.0.1:5000

The following picture is a snapshot of a model execution history in the mlflow UI:

Figure 2: Tracking hyper-parameters and metrics in mlflow UI

5. Evaluation

Once a machine learning model has been trained with expected performance, the next step is to assess the prediction results of the model in a controlled close-to-real settings to gain confidence that the model is valid, reliable, and meets business requirements of deployment.

As an example, for the Kaggle interview attendance project, one possible method of evaluation is to use mlflow to deploy the model as a Web service and then develop a client program to call the model Web service to get the prediction results of a testing dataset after going through data preparation. These prediction results can then be used to generate a report (e.g., a table or csv file) for recruitment industry domain experts to review.

As demonstration purpose, the following code uses the prediction results with a threshold of 0.5 to generate both a probability and a prediction for each of the candidates where the value in the “Observed Attendance” column is missing, and forms the results as a Pandas DataFrame.

pred_probs   = predictInterview.predictNullAttendanceProbability()
pred_classes = predictInterview.predictNullAttendanceClasses()
x = predictInterview.X_test_name_ids.to_numpy()
z = zip(x, pred_probs, pred_classes)
answers = ('no', 'yes')
result = [[x1, p1[1], answers[c]] for x1, p1, c in z]
result_df = pd.DataFrame(np.array(result), columns=['Names/ID', 'Probability', 'Yes/No'])
result_df.to_csv('interview_prediction.csv')
result_df.head(15)

The following are the first 15 rows of the DataFrame:

6. Deployment

Once the model evaluation concludes that the model is ready for deployment, the final step is to deploy an evaluated model into a production system. As described in the book Data Science for Business, the specifics of deployment depend on the target production system.

Taking the Kaggle interview attendance project as an example, one possible scenario is to deploy the model as a Web service on a server, which can be called by other components in a target production system to get prediction results for assisting job interview arrangement. In a more complicated scenario that the development of the target production system is based on a programming language (e.g., Java) that is different from the modeling language (e.g., Python), then the chance is that the model needs to be reimplemented in the target programming language as a component of the production system.

6.1 Deploying Model as a Web Service

As described before, a trained model has been logged into a file during the process of tracking model executions in mlflow. The following screen snapshot shows the information of a logged model:

Figure 3: Logging trained model in mlflow UI

Similarly to the mlflow tutorial, the following code is to use the mlflow built-in functionality to start a logged model as a Web service:

mlflow models serve -m /Users/xyz/machine-learning-spark/mlruns/0/258301f3ac5f42fb99e885968ff17c2a/artifacts/random_forest_model -p 1234

6.2 Calling Model Web Service to Predict Outcome

For simplicity, in this section, it is assumed that the test_df is a Pandas DataFrame with only one row of testing data (an interview attendance feature vector):

test_df.head() 

The following code can be used to send the row of testing data to the model Web service to obtain the predicted interview attendance (1 - Yes, 0 - No):

import requests
import json
headers = {'Content-Type': 'application/json',
'Format': 'pandas-split'}
url = 'http://127.0.0.1:1234/invocations'headers_json_str = json.dumps(headers)
headers_json_obj = json.loads(headers_json_str)
data_json_obj = test_df.to_json(orient='split')
response = requests.post(url, data=data_json_obj, headers = headers_json_obj)response.text

Summary

In this article, I used a close-to-real challenging dataset, the Interview Attendance Problem for Kaggle competition, to demonstrate an end-to-end process of developing an Object-Oriented machine learning pipeline in Python for both Pandas and Koalas DataFrames by combining Pandas and Koalas DataFrame with scikit-learn pipeline API and mlflow. This end-to-end development process follows the Cross-industry standard process for data mining. A brief description and sample implementation code are provided for each of the phases (except for the first phase) of the standard process. A Jupyter notebook and corresponding Python source code file are available in Github [6].

References

[1] Provost, F., Fawcett, T. (2013). Data Science for Business, O’Reilly, July 2013

[2] Geron, A. (2017). Hands-On Machine Learning with Scikit-Learn & TensorFlow, O’Reilly, March 2017

[3] mlflow 1.3.0 tutorial: https://www.mlflow.org/docs/latest/tutorial.html

[4] The Interview Attendance Problem: https://www.kaggle.com/vishnusraghavan/the-interview-attendance-problem/data

[5] Zhang, Y. (2019). Python Data Preprocessing Using Pandas DataFrame, Spark DataFrame, and Koalas DataFrame: https://towardsdatascience.com/python-data-preprocessing-using-pandas-dataframe-spark-dataframe-and-koalas-dataframe-e44c42258a8f

[6] Zhang, Y. (2019). Jupyter notebook in Github

DISCLOSURE STATEMENT: © 2019 Capital One. Opinions are those of the individual author. Unless noted otherwise in this post, Capital One is not affiliated with, nor endorsed by, any of the companies mentioned. All trademarks and other intellectual property used or displayed are property of their respective owners.

--

--

Senior Data Scientist at Wavicle Data Solutions, He was a Senior Data Scientist at SMS Assist, a Senior Data Engineer at Capital One, and a DMTS at Motorola