Predict Customer Churn using PySpark Machine Learning

Marvin Lüthe
Towards Data Science
13 min readNov 5, 2019

--

Source: Globallinker

Predicting customer churn is a challenging and common problem that data scientists encounter these days. The ability to predict that a particular customer is at a high risk of churning, while there is still time to do something about it, represents a huge additional potential revenue source for every customer-facing business.

In this post, I will guide you through the creation of a machine learning solution which will be able to predict customer churn. This solution will be realized with Apache Spark. Apache Spark is a popular distributed data processing engine which can be deployed in a variety of ways, providing native bindings for Java, Scala, Python and R. It provides a stack of libraries including Spark SQL, Spark Streaming, MLlib for machine learning and GraphX for graph processing. For this project, we will focus on the machine learning library MLlib. We will use the Python API for Spark known as PySpark.

If you read this article, you will learn how to:

  • load large datasets into Spark and manipulate them using Spark SQL and Spark Dataframes to engineer relevant features for predicting customer churn,
  • use the machine learning APIs within Spark ML to build and tune models.

Introduction

Imagine you are working on the data team for a popular digital music service similar to Spotify. Let’s call it Sparkify. The users stream their favorite songs every day either using the free tier that places advertisements between the songs or using the premium subscription model where they stream music as free but pay a monthly flat rate. The users can upgrade, downgrade and cancel the service at any time. Every time the user interacts with the service like playing songs, logging out or liking a song with a thumbs-up, it generates data. All this data contains the key insights for keeping the users happy and helping the business thrive. It’s our job on the data team to predict which users are at risk to cancel their accounts. If we can accurately identify these users before they leave, our business can offer them discount and incentives, potentially saving our business millions in revenue.

Import Libraries and Set-Up Spark

I used IBM Watson Studio (Default Spark Python 3.6 XS, one driver, two executors, Spark Version 2.3) to work on this project. The interaction with PySpark dataframes is not as convenient as it is with pandas dataframes. This is why I recommend installing and importing pixiedust:

!pip install --upgrade pixiedust
import pixiedust

pixiedustis an open-source Python helper library that works as an add-on to Jupyter notebooks and strongly improves the way we can interact with PySpark dataframes.

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import datetime

from sklearn.metrics import f1_score, recall_score, precision_score
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, DateType, FloatType
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, GBTClassifier, LinearSVC

Create a Spark session and read in the Sparkify dataset:

# create a Spark session
spark = SparkSession \
.builder \
.appName("Sparkify") \
.getOrCreate()
# read in dataset
df = spark.read.json('medium-sparkify-event-data.json')

pixiedustnow comes in handy and we can display the first entries of the dataframe.

display(df)
Figure 1

Have a peek at the schema:

df.printSchema()
Figure 2

The dataset holds information about how the users interact with the streaming platform, which songs they listened, which page they visited, their account status, etc. Any of the user interactions are stored with a UNIX timestamp which makes it possible to analyze changes in user behaviour over time. We will take advantage of that piece of information during the feature engineering process later.

Exploratory Data Analysis

Next, we will perform EDA by doing basic manipulations within PySpark.

In order to understand how the users interact with the music service, we might want to see which pages they view the most.

df.groupBy('page').count().sort(F.desc('count')).show()
Figure 3

We can clearly see that “NextSong” is the most popular page view which makes perfect sense for a music service. However, there are many other page views which are going to be important for engineering relevant features from this raw dataset. We take the page “Cancellation Confirmation”, counting 99 visits, to create the label for the machine learning models.

flag_cancellation_event = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn('label', flag_cancellation_event('page'))

Based on the UNIX timestamp ts we can calculate statistics by hour.

get_hour = F.udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour, IntegerType())
df = df.withColumn('hour', get_hour(df.ts))

Since matplotlib does not work with PySpark dataframes we convert it back to a pandas dataframe and plot the user activity by hour.

# Count the events per hour
songs_by_hour = df.groupBy('hour').count().orderBy(df.hour)
songs_by_hour_pd = songs_by_hour.toPandas()
songs_by_hour_pd.hour = pd.to_numeric(songs_by_hour_pd.hour)
# Plot the events per hour aggregation
plt.scatter(songs_by_hour_pd['hour'], songs_by_hour_pd['count'])
plt.xlim(-1, 24)
plt.ylim(0, 1.2 * max(songs_by_hour_pd['count']))
plt.xlabel('Hour')
plt.ylabel('Events');
Figure 4

Feature Engineering

Feature engineering plays a key role in big data analytics. Machine learning and data mining algorithms cannot work without data. Little can be achieved if there are few features to represent the underlying data objects, and the quality of results of those algorithms largely depends on the quality of the available features.
Accordingly, we start building out the features we find promising to train the model on.

To this end, we create a new PySpark dataframe feature_df from scratch, each row representing a user. We will create features from the dataframe df and join those sequentially to the dataframe feature_df.

Based on the column label in df we can separate the churned users from the rest.

churned_collect = df.where(df.label==1).select('userId').collect()
churned_users = set([int(row.userId) for row in churned_collect])
all_collect = df.select('userId').collect()
all_users = set([int(row.userId) for row in all_collect])
feature_df = spark.createDataFrame(all_users, IntegerType()).withColumnRenamed('value', 'userId')

Encode Label

# Create label column
create_churn = F.udf(lambda x: 1 if x in churned_users else 0, IntegerType())
feature_df = feature_df.withColumn('label', create_churn('userId'))

Encode Gender and Account Level as Features

# Create binary gender column
convert_gender = F.udf(lambda x: 1 if x == 'M' else 0, IntegerType())
df = df.withColumn('GenderBinary', convert_gender('Gender'))
# Add gender as feature
feature_df = feature_df.join(df.select(['userId', 'GenderBinary']), 'userId') \
.dropDuplicates(subset=['userId']) \
.sort('userId')
convert_level = F.udf(lambda x: 1 if x == 'free' else 0, IntegerType())
df = df.withColumn('LevelBinary', convert_level('Level'))
# Add customer level as feature
feature_df = feature_df.join(df.select(['userId', 'ts', 'LevelBinary']), 'userId') \
.sort(F.desc('ts')) \
.dropDuplicates(subset=['userId']) \
.drop('ts')

Encode Page Views as Features

Every time the users interact with the platform, it generates data. This means that we know exactly what each of the users experienced during the period of this data extract. My approach is to divide the pages into categories:

  • Neutral pages: “Cancel”, “Home”, “Logout”, “Save Settings”, “About”, “Settings”
  • Negative pages: “Thumbs Down”, “Roll Advert”, “Help”, “Error”
  • Positive pages: “Add to Playlist”, “Add Friend”, “NextSong”, “Thumbs Up”
  • Downgrade pages: “Submit Downgrade”, “Downgrade”
  • Upgrade pages: “Submit Upgrade”, “Upgrade”

The reasoning behind this approach is that we can count how often a user had an interaction e.g. with a positive page. We could have done this for every page separately but this would result in a much higher feature space.

Let’s put this into code:

# Create a dictonary which maps page views and PySpark dataframes 
pages = {}
pages['neutralPages'] = df.filter((df.page == 'Cancel') | (df.page == 'Home') | (df.page == 'Logout') \
| (df.page == 'Save Settings') | (df.page == 'About') | (df.page == 'Settings'))
pages['negativePages'] = df.filter((df.page == 'Thumbs Down') | (df.page == 'Roll Advert') | (df.page == 'Help') \
| (df.page == 'Error'))
pages['positivePages'] = df.filter((df.page == 'Add to Playlist') | (df.page == 'Add Friend') | (df.page == 'NextSong') \
| (df.page == 'Thumbs Up'))
pages['downgradePages'] = df.filter((df.page == 'Submit Downgrade') | (df.page == 'Downgrade'))
pages['upgradePages'] = df.filter((df.page == 'Upgrade') | (df.page == 'Submit Upgrade'))
# Loop through page views and aggregate the counts by user
for key, value in pages.items():
value_df = value.select('userId') \
.groupBy('userId') \
.agg({'userId':'count'}) \
.withColumnRenamed('count(userId)', key)

# Add page view aggregations as features
feature_df = feature_df.join(value_df, 'userId', 'left').sort('userId') \
.fillna({key:'0'})

Next, we will calculate how many days the users interacted with the platform:

# Create dataframe with users and date counts
dateCount_df = df.select('userId', 'date') \
.groupby('userId') \
.agg(F.countDistinct('date')) \
.withColumnRenamed('count(DISTINCT date)', 'dateCount')

# Add date count as feature
feature_df = feature_df.join(dateCount_df, 'userId', 'left').sort('userId') \
.fillna({'dateCount':'1'})

These page view features are absolute values counting the number of occurrences. However, this can lead to misleading results if some users signed up at the end of the data extract while others used the platform from the very beginning. For this purpose, we will make the aggregated results comparable by dividing them through the user-specific time window, obtaining counts/day. I skip the code at this place, the full code is available on GitHub.

Encode User Activity over Time as Feature

Another promising feature is how the user interactions change over time. First, we calculate the number of user interactions per day. Second, we fit one linear regression model per user using numpy.polyfit. We will take the slopes of these lines, remove outliers and plug the scaled slopes as features into the classification algorithms.

Figure 5
# Create dataframe with users and their activity per day
activity_df = df.select('userId', 'date') \
.groupby('userID', 'date') \
.count()
# initialize slopes
slopes = []
for user in all_users:
# Create pandas dataframe for slope calculation
activity_pandas = activity_df.filter(activity_df['userID'] == user).sort(F.asc('date')).toPandas()
if activity_pandas.shape[0]==1:
slopes.append(0)
continue
# Fit a line through the user activity counts and retrieve its slope
slope = np.polyfit(activity_pandas.index, activity_pandas['count'], 1)[0]
slopes.append(slope)

Feature Scaling, Merge Columns to one Features Vector

As a final step for the feature engineering process we will iterate over the created features and scale them using MinMaxScaler. Afterwards, we put the features into one vector so that we can plug it into the pyspark.ml algorithms.

# UDF for converting column type from vector to double type
unlist = F.udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterate over columns to be scaled
for i in ['neutralPagesNormalized', 'negativePagesNormalized', 'positivePagesNormalized', 'downgradePagesNormalized', 'upgradePagesNormalized', 'dateCountNormalized', 'hourAvg', 'UserActiveTime', 'Slope']:
# VectorAssembler Transformation - Convert column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])

# Fitting pipeline on dataframe
feature_df = pipeline.fit(feature_df).transform(feature_df) \
.withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

# Merge columns to one feature vector
assembler = VectorAssembler(inputCols=['neutralPagesNormalized_Scaled', 'negativePagesNormalized_Scaled', 'positivePagesNormalized_Scaled', 'downgradePagesNormalized_Scaled', 'upgradePagesNormalized_Scaled', 'dateCountNormalized_Scaled', 'hourAvg_Scaled', 'UserActiveTime_Scaled', 'Slope_Scaled', 'LevelBinary', 'GenderBinary'], outputCol='features')
feature_df = assembler.transform(feature_df)

Following the schema of feature_df:

Figure 6

The features column holds the combined features vector for each user:

Figure 7

Machine Learning Modeling

After the creation of features, we can move on and split the full dataset into training and testing. We will test out several common machine learning methods used for classification tasks. The accuracy of the models will be evaluated and parameters tuned accordingly. Based on the F1-Score, Precision and Recall we will determine the winning model.

Split the features dataframe into training and testing and check for class imbalance.

train, test = feature_df.randomSplit([0.7, 0.3], seed = 42)plt.hist(feature_df.toPandas()['label'])
plt.show()
Figure 8

It is crucial to check for potential class imbalance in the data. It is extremely common in practice and many classification learning algorithms have low predictive accuracy for the infrequent class.

Machine Learning Hyperparameter Tuning and Evaluation

Spark’s MLlib supports tools for model selection such as CrossValidator. This requires the following:

  • Estimator: algorithm or pipeline
  • Set of parameters: parameter grid to search over
  • Evaluator: metric to measure how well a fitted model does on the test dataset.

The estimators and parameters will be set for each classifier specifically.

For evaluation, we take the BinaryClassificationEvaluator which supports the “areaUnderROC” and the “areaUnderPR”. Since we have a class imbalance in the data, we take the “areaUnderPR” as our evaluation metric because the PR curve is more informative in this case (cf. http://pages.cs.wisc.edu/~jdavis/davisgoadrichcamera2.pdf).

Since the class BinaryClassificationEvaluator from pyspark.ml.evaluation only provides the metrics “areaUnderPR” and “areaUnderROC”, we will compute the F1-Score, Precision and Recall with sklearn.

evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderPR')

Logistic Regression

Logistic regression is basically a linear regression model which explains the relationship between dependent variables and one or more nominal, ordinal, interval or ratio-level independent variables with only difference being, the output for linear regression is a number that has its real meaning (a label) while the output of a logistic regression is a number that represents the probability of the event happening (i.e. the probability of a customer deleting its account).

Before instantiating the logistic regression object we calculate a balancing ratio to account for the class imbalance. We use the weightCol parameter to over-/under-sample training instances according to the pre-calculated ratios. We want to “under-sample” the negative class and “over-sample” the positive class. The logistic loss objective function should treat the negative class (label == 0) with lower weight.

# Calculate a balancing ratio to account for the class imbalance
balancing_ratio = train.filter(train['label']==0).count()/train.count()
train=train.withColumn("classWeights", F.when(train.label == 1,balancing_ratio).otherwise(1-balancing_ratio))

# Create a logistic regression object
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', weightCol="classWeights")

For logistic regression, pyspark.ml supports extracting a trainingSummaryof the model over the training set. This does not work with a fitted CrossValidator object which is why we take it from a fitted model without parameter tuning.

lrModel = lr.fit(train)
trainingSummary = lrModel.summary

We can use it to plot the threshold-Recall curve, threshold-Precision curve, Recall-Precision curve and threshold-F-Measure curve to examine how our model performs. This threshold is a prediction threshold which determines the predicted class based on the probabilities that the model outputs. Model optimization includes tuning this threshold.

# Plot the threshold-recall curve
tr = trainingSummary.recallByThreshold.toPandas()
plt.plot(tr['threshold'], tr['recall'])
plt.xlabel('Threshold')
plt.ylabel('Recall')
plt.show()
Figure 9
# Plot the threshold-precision curve
tp = trainingSummary.precisionByThreshold.toPandas()
plt.plot(tp['threshold'], tp['precision'])
plt.xlabel('Threshold')
plt.ylabel('Precision')
plt.show()
Figure 10
# Plot the recall-precision curve
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'], pr['precision'])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.show()
Figure 11
# Plot the threshold-F-Measure curve
fm = trainingSummary.fMeasureByThreshold.toPandas()
plt.plot(fm['threshold'], fm['F-Measure'])
plt.xlabel('Threshold')
plt.ylabel('F-1 Score')
plt.show()
Figure 12

As we increase the prediction threshold the Recall starts dropping while the Precision score improves. It is common practice to visualize competing metrics against one another.

Let’s use Cross Validation to tune our logistic regression model:

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.5, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.addGrid(lr.maxIter, [1, 5, 10])
.build())

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

After parameter tuning, the logistic regression shows the following performance:

  • F1-Score: 0.66
  • Recall: 0.84
  • Precision: 0.54

Gradient-Boosted Tree Classifier

Gradient-Boosted Trees are a popular classification method using ensembles of decision trees. Boosting algorithms are generally good choices for class imbalanced data. It is supported by PySpark’s MLlib, so let’s try it on our dataset:

gbt = GBTClassifier()# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(gbt.maxDepth, [2, 4, 6])
.addGrid(gbt.maxBins, [20, 60])
.addGrid(gbt.maxIter, [10, 20])
.build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

After parameter tuning, the Gradient-Boosted Tree Classifier shows the following performance:

  • F1-Score: 0.58
  • Recall: 0.56
  • Precision: 0.61

Decision Tree Classifier

Decision trees are a popular family of classification and regression methods.

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 4, 6])
.addGrid(dt.maxBins, [20, 60])
.addGrid(dt.impurity, ['gini', 'entropy'])
.build())
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

After parameter tuning, the Decision Tree Classifier shows the following performance:

  • F1-Score: 0.56
  • Recall: 0.60
  • Precision: 0.52

Conclusion

The goal of this project was to exploit the capabilities of Apache Spark’s analytics engine for large-scale data processing to detect customers who are about to stop using Sparkify’s music streaming service.

We applied the typical steps of the data science process like gaining understanding about the data, data preparation, modeling and evaluation.

The logistic regression model shows the highest performance (F1-Score: 0.66, Recall: 0.84, Precision: 0.54). We are able to recall 84% of the churning customers and can provide them with special offers to keep them from deleting their Sparkify accounts. However, we need to consider a moderate Precision score of 54%. This means that, from all the customers which will receive special offers, 46% of those customers were actually satisfied with the service and would not need any special treatment.

The source code for this post can be found on GitHub. I look forward to hearing any feedback or questions.

--

--