Learn How To Use Spark ML and Spark Streaming

A tutorial on how to use SparkML to make predictions on streaming data using PySpark

Navid Mashinchi
Towards Data Science

--

Photo by Markus Winkler on Unsplash

Objective:

In this article, I am going over an example of how we can use Spark ML to make predictions on streaming data. Note we won’t focus on comparing different models and tuning the model. The main focus will be on how we can incorporate Spark Streaming to make predictions using databricks. In addition to that, you should have some basic knowledge of how to use Spark ML. If Spark ML is new to you, check out the video below.

For this example, we will predict whether someone will get a heart attack based on their age, gender, and medical conditions. A logistic regression will be trained, and we stream in unseen data to make predictions.

Data Collection:

Photo by jesse orrico on Unsplash

For this project, I used a dataset that is available on Kaggle. Please click here if you want to follow along. The data consists of 303 rows and 14 columns. Each row represents the information of a patient. The features of this dataset consist of the following columns:

  • age: age in years1. age: age in years
  • sex: sex (1 = male; 0 = female)
  • cp: chest pain type (1 = typical angina; 2 = atypical angina; 3 = non-
    anginal pain; 0 = asymptomatic)
  • trtbps: resting blood pressure (in mm Hg on admission to the hospital)
  • chol: serum cholesterol in mg/dl
  • fbs: fasting blood sugar > 120 mg/dl (1 = true; 0 = false)
  • restecg: resting electrocardiographic results (1 = normal; 2 = having ST-T
    wave abnormality; 0 = hypertrophy)
  • thalachh: maximum heart rate achieved
  • exng: exercise-induced angina (1 = yes; 0 = no)
  • oldpeak: ST depression induced by exercise relative to rest
  • slp: the slope of the peak exercise ST segment (2 = upsloping; 1 = flat;
    0 = downsloping)
  • caa: number of major vessels (0-3) colored by fluoroscopy
  • thall: 2 = normal; 1 = fixed defect; 3 = reversible defect

The target column is the following:

  • output: 0= less chance of heart attach 1= more change of heart attack.

The first step is to create a schema to ensure that the data will consist of the correct data type when reading in the csv file. Next, we will use spark.read.format() function with “csv” as an argument, add the option to read in the header and assign the schema we created to the data frame. Last we load the data, and we also change the target column to label so that our logistic regression can identify which column the target variable is.

from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType
# We use the following schema
schema = StructType( \
[StructField("age", LongType(),True), \
StructField("sex", LongType(), True), \
StructField("cp", LongType(), True), \
StructField('trtbps', LongType(), True), \
StructField("chol", LongType(), True), \
StructField("fbs", LongType(), True), \
StructField("restecg", LongType(), True), \
StructField("thalachh", LongType(), True),\
StructField("exng", LongType(), True), \
StructField("oldpeak", DoubleType(), True), \
StructField("slp", LongType(),True), \
StructField("caa", LongType(), True), \
StructField("thall", LongType(), True), \
StructField("output", LongType(), True), \
])
data = "dbfs:/FileStore/tables/heart.csv"
df=spark.read.format('csv').option('header',True).schema(schema).load(data)
df = df.withColumnRenamed("output","label")
df.display()
df.printSchema()

Once we run the above code, we get the following output below:

Image by author

Machine Learning:

When it comes to data preprocessing steps, I first split the data into a training (70%) and test (30%) set.

testDF, trainDF = df.randomSplit([0.3, 0.7])

After that, I created a pipeline that consists of five stages. The first stage is a vector assembler that takes in the age, trtbps, chol, thalachh, oldpeak columns and turns them into a vector. The second stage entails the scaling process of the features mentioned above. I am using the MinMaxScaler() function from the pyspark.ml.feature library. After that, I one-hot encode the sex, cp, fbs, restecg, slp, exng, caa, and thall columns, since those are nominal categorical variables. Next, I create a second vector assembler and add the one-hot encoded columns and scaled features into one vector. Last but not least, the last stage consists of a Logistic Regression with the following parameters:

  • maxIter = 10
  • regParam = 0.01

I chose a logistic regression algorithm because our target consists of binary numbers (0 and 1). Once the pipeline has been created, I fit and transform the training set. After that, I select the label, probability, and prediction columns. Please see below a snippet of the pipeline construction and the predictions when training the model.

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
# Create the logistic regression model
lr = LogisticRegression(maxIter=10, regParam= 0.01)
# We create a one hot encoder.
ohe = OneHotEncoder(inputCols = ['sex', 'cp', 'fbs', 'restecg', 'slp', 'exng', 'caa', 'thall'], outputCols=['sex_ohe', 'cp_ohe', 'fbs_ohe', 'restecg_ohe', 'slp_ohe', 'exng_ohe', 'caa_ohe', 'thall_ohe'])
# Input list for scaling
inputs = ['age','trtbps','chol','thalachh','oldpeak']
# We scale our inputs
assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_scaled1")
scaler = MinMaxScaler(inputCol="features_scaled1", outputCol="features_scaled")
# We create a second assembler for the encoded columns.
assembler2 = VectorAssembler(inputCols=['sex_ohe', 'cp_ohe', 'fbs_ohe', 'restecg_ohe', 'slp_ohe', 'exng_ohe', 'caa_ohe', 'thall_ohe','features_scaled'], outputCol="features")
# Create stages list
myStages = [assembler1, scaler, ohe, assembler2,lr]
# Set up the pipeline
pipeline = Pipeline(stages= myStages)
# We fit the model using the training data.
pModel = pipeline.fit(trainDF)
# We transform the data.
trainingPred = pModel.transform(trainDF)
# # We select the actual label, probability and predictions
trainingPred.select('label','probability','prediction').show()
Image by author

As we can see above, the marked yellow rows show that the lower the probability, the more confident the model is that the prediction is a 1. On the other hand, the marked red row shows the higher the probability, the more certain it predicts the output to be a zero. In addition to that, I evaluate the performance of the model, calculating the overall accuracy score. Please see below the code snippet of how I calculated the accuracy score.

Image by author

When training the model on the training data, the accuracy score resulted in 0.902913%, which is a satisfying result.

Streaming:

To incorporate Spark Streaming, I repartitioned the test data set into ten different files to replicate the streaming simulation.

# We now repartition the test data and break them down into 10 different files and write it to a csv file.
testData = testDF.repartition(10)
#Remove directory in case we rerun it multiple times.
dbutils.fs.rm("FileStore/tables/HeartTest/",True)
#Create a directory
testData.write.format("CSV").option("header",True).save("FileStore/tables/HeartTest/")

After that, I first created a source, which consisted of the following lines of code.

# Source
sourceStream=spark.readStream.format("csv").option("header",True).schema(schema).option("ignoreLeadingWhiteSpace",True).option("mode","dropMalformed").option("maxFilesPerTrigger",1).load("dbfs:/FileStore/tables/HeartTest").withColumnRenamed("output","label")

As you can see above, I use spark.readStream and read in a file with the format “csv”. In addition to that, I add the schema that I created at the beginning when reading in the file, followed by multiple options such as:

  • ignoreLeadingWhiteSpace: True → Removes the leading whitespace.
  • mode: DropMalformed → When set to DropMalformed it ignores the whole corrupted records.
  • maxFilesPerTrigger: 1 → Maximum number of new files to be considered in every trigger.

After that, I load in the data from the directory where I repartitioned the test data to replicate the simulation of streaming. Last but not least, I change the output column to label for the implementation of the logistic regression.

The last step is to set up the streaming of the test data. I use the pipeline that has been fit to the training set (pModel) and use the transform functionality with the argument “sourceStream”, which is the source I created before. Lastly, I select the label, probability, and prediction columns. Please see below for the code snippet.

Image by author

As we can see, the green light is on, which indicates that we are streaming in unseen data, which comes from the test data that has been repartitioned to replicate the simulation for streaming purposes. Below is a sample output of our streaming, which shows us the actual label of the test data, the probability, and the model’s prediction on the unseen data.

Image by author

To assess the predictions on the test data, we can see the probabilities of which class the streamed data is part of. For example, when we look at row 1, we can see the vector in the probability column, which consists of [0.06936682704327157, 0.9306331729567284]. The first element in the vector represents the probability of class 0 (no heart attack), and the second element the probability of class 1 (heart attack). The model picks the higher probability value and assigns the streamed data to the class with the higher probability. In the first example, the model predicts 1, which is correct when comparing it with the actual label.

Final Words:

I hope this example helped you to understand better how to make predictions using streamed data using Spark. Again the main focus of this article wasn’t about the performance of the model but more about how we can use unseen streamed data in our machine learning model. If you have more questions about incorporating Spark Streaming to Spark ML, I highly suggest going over the databricks documentation. Please see the link below:

If you have any questions on this topic or have some feedback, feel free to contact me. I would appreciate it if you would share this article on any social media platforms. Thank you and until next time!✌️

--

--

Data Scientist at Kohl’s | Adjunct Professor at University of Denver | Data Science Mentor at SharpestMinds