The world’s leading publication for data science, AI, and ML professionals.

Machine Learning in Apache Spark for Beginners – Healthcare Data Analysis

Quick-start your machine learning journey into the world of Big Data.

Big Data / Data Science / Tutorial / Guide / Apache Spark / Machine Learning

Step by Step guide to build you first Machine Learning model in Apache Spark using Databricks

Introduction:

Apache Spark is a cluster computing framework designed for fast and efficient computation. It can handle millions of data points with a relatively low amount of computing power. Apache Spark is built on top of and is an extension of Hadoop’s Map-Reduce, which efficiently uses different combinations of cluster computing. The main feature of Spark is in-memory cluster computing, which increases application speeds, including interactive queries and stream processing.

This post is a quick-start guide for developing a prediction model in Spark using Databricks.

Fig: Databrick: Unified Data Analytics
Fig: Databrick: Unified Data Analytics

I will be using a free community version from Databricks, credits to them!

About Data/Background Information:

In this post, I will be using machine learning to help us predict the probability of a patient having diabetes. The dataset is downloaded from the UCI Machine Learning Repository.

Here I am predicting diabetes probability using the information provided about the patient. It is a binary classification problem, where I will try to predict the probability of an observation belonging to a diabetes category.

I am going to first demonstrate a minimum amount of exploratory analysis and later will jump to machine learning models (i.e., regression and tree-based models) and will compare and summarize the results.

Data Pre-processing and Exploration:

The following code lines load the data and create a dataframe object. Setting Inferschema to true can give a good guess about the data type of each column.

#The Applied options are for CSV files
df = spark.read.format("csv") 
     .option("inferSchema","true") 
     .option("header","true") 
     .option("sep",",") 
     .load(file_location)

I also created a dictionary to store features with respect to their data types. In our case, we have an ‘Integer Type’ and ‘Double Type’.

from collections import defaultdict
data_types = defaultdict(list)
for entry in df.schema.fields:
  data_types[str(entry.dataType)].append(entry.name)
Code output to view columns and its datatypes
Code output to view columns and its datatypes

Let’s have a look at the first 5 rows of our dataset.

display(df.limit(5))

The diabetes dataset consists of 768 data points with 9 features each:

"Outcome" is the feature we are going to predict, where 0 means the patient does not have diabetes, and 1 means the patient does have diabetes. Of these 768 data points, 500 are labeled as 0 and 268 as 1.

display(df.groupby('Outcome').count())

One advantage of using Databricks is it helps one to visualize the query into some basic plot options to provides a better understanding of data along with code.

We have a complete dataset without any missing values, but to find more information about dealing with missing data, you can consult this article:

https://www.analyticsvidhya.com/blog/tag/missing-values-treatment/

Handling Categorical Data:

In our data, we only have a single categorical column, i.e., ‘Pregnancies’ with over 17 categories. The following code shows how you can convert categorical columns/features into one-hot encoding. In Spark, ‘String Indexer’ is used which assigns a unique integer value to each category. 0 is assigned to the most frequent category, 1 to the next most frequent category, and so on.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stage_string = [StringIndexer(inputCol= c, outputCol=
        c+"_string_encoded") for c in strings_used]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded",
        outputCol= c+ "_one_hot") for c in strings_used]
ppl = Pipeline(stages= stage_string + stage_one_hot)
df = ppl.fit(df).transform(df)

In the above code, I have used a pipeline that effectively deals with a series of tasks in a single iteration. One can make a list of tasks and a pipeline will handle everything.

In general, a machine learning pipeline describes the process of writing code, releasing it to production, doing data extraction, creating training models and tuning the algorithm. It is a continuous process while working on an ML platform. But when it comes to Apache Spark a pipeline is an object that transforms, evaluates, and fits steps into one object. These steps are called ml workflow.

Vector Assembler:

The idea here is to assemble a given list of columns into a single vector column and bundle them together. This is an additional step that is required by Spark’s machine learning models. This step is usually performed at the end of data exploration and pre-processing steps. At this stage, I am working with a few raw and few transformed features that can be used to train a model.

from pyspark.ml.feature import VectorAssembler
features = ['Pregnancies_one_hot','Glucose','BloodPressure',
'SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age']
vector_assembler = VectorAssembler(inputCols = features, 
                                   outputCol= "features")
data_training_and_test = vector_assembler.transform(df)

Model Fitting:

We have a couple of built-in classifiers, including random forest, boosting trees, logistic regression, etc. To start with, I am implementing Random Forest as an example, specifying the number of trees in the classifier and leaving the remaining parameters at their default value.

To evaluate the performance of our model, I am using the ROC curve metric. You can choose ‘metricName’ of your choice.

The accuracy of this model is 82.5%. This indicates our model is working quite well with the default parameters.

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
(training_data, test_data) = data_training_and_test.randomSplit([0.7, 0.3], 2017)
rf = RandomForestClassifier(labelCol = "Outcome", 
                        featuresCol = "features", numTrees = 20)
rf_model = rf.fit(training_data)
predictions = rf_model.transform(test_data)
evaluator= BinaryClassificationEvaluator(labelCol = "Outcome", rawPredictionCol="probability", metricName= "areaUnderROC")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:",accuracy*100)

Feature Selection:

The feature selection process helps to filter out less important variables that can lead to a simpler and more stable model. In Spark, implementing feature selection is not as easy as in, for example, Python’s scikit-learn, but it can be managed by making feature selection part of the pipeline. The idea is:

  1. Fit the classifier first. For instance, you can go with the regression or tree-based models, any model of your choice.
  2. Find feature importance if you use the random forest, find the coefficient if you are using logistic regression.
  3. Store the most important set of features in a list.
  4. Use the ‘VectorSlicer‘ method from the ml library, and make a new vector from the list you just selected.

The following code shows how to create a list of important features, from the model that we previously fitted. Features greater than 0.03 are kept, rf_model is the fitted random forest model.

importance_list = pd.Series(rf_model.featureImportances.values)
sorted_imp = importance_list.sort_values(ascending= False)
kept = list((sorted_imp[sorted_imp > 0.03]).index)

Taking 0.03 is random, one can try different values based on the AUC metric. Later I used vector slicer to collect all features that have importance greater than 0.03.

from pyspark.ml.feature import VectorSlicer
vector_slicer = VectorSlicer(inputCol= "features", 
                         indices= kept, outputCol= "feature_subset")
with_selected_feature = vector_slicer.transform(training_data)
rf_modified = RandomForestClassifier(numTrees=20,
                labelCol = "Outcome", featuresCol="feature_subset")
test_data = vector_slicer.transform(test_data)
prediction_modified = rf_modified.fit(with_selected_feature)
                                 .transform(test_data)
evaluator_modified = BinaryClassificationEvaluator(labelCol = "Outcome",rawPredictionCol="probability", metricName= "areaUnderROC")
accuracy = evaluator_modified.evaluate(prediction_modified)
print("Accuracy: ",accuracy*100)

With feature selection, we saw an improvement of 1% in the accuracy with overall accuracy as 83%. From the full feature set, we got 82% accuracy. When dealing with Big Data even 1% improvement matters.

In the end, I just want to conclude by saying that Apache Spark is concise and easy to use an open-source framework.


Thank you for taking the time to read!

I am always looking forward to learn and grow, reach out to me if you have any questions or suggestions!

LinkedIn | [email protected] | Github


Related Articles