Serving ML Models with Apache Spark

An end-to-end guide on how to serve models with PySpark

Pınar Ersoy
Towards Data Science

--

Source

Processing large datasets accompany the difficulties of restrictions set by technologies and programming languages. An impactful step is being aware of distributed processing technologies and their supporting libraries. This article is fundamental for machine learning engineers and data scientists hoping to utilize the data processing, MLlib, and model serving capabilities of Apache Spark.

What is Apache Spark?

Apache Spark is a system that provides a cluster-based distributed computing environment with the help of its broad packages, including:

  • SQL querying,
  • streaming data processing, and
  • machine learning.

Apache Spark supports Python, Scala, Java, and R programming languages.

Apache Spark serves in-memory computing environments. The platform supports a running job to perform 100 times higher speed in memory and ten times performance on disk, according to the book “Cloud Computing Technologies for Green Enterprises” which is written by Kashif Munir.

What is PySpark?

Originally, Apache Spark was implemented in the Scala language. Since most machine learning libraries and higher-level data processing packages are scripted in Python, the demand for integration with Spark was obvious. To cope with this demand, a Python API was developed for Spark. It was named PySpark. It is established with the help of a Python interpreter called Py4J that synchronizes connections to the Java Virtual Machine (JVM).

How does Spark work?

Apache Spark enables the horizontal processing of jobs. It advances this task by allowing the usage of in-memory properties with enhanced SQL proficiency. Spark has capabilities including but not limited to:

  • operate numerous distributed scripts,
  • enable data processing,
  • generate data workflows, and
  • conduct analytical methods with MLlib functions

Spark components

The Spark project comprises various sorts of firmly incorporated segments. In the center, Spark contains a calculation-performing mechanism that can plan, parallelize and screen numerous applications. Using all the Spark components simultaneously is not compulsory. According to the existing case and requirements, some of them may be utilized with Spark Core. However, Spark Core usage is obligatory since it is the kernel of the Spark architecture.

Spark core

Spark Core is the center of the common implementation component that supports the rest of the range of capabilities in the platform.

Spark streaming

Spark Streaming allows Spark to work with online streaming data consumed from various systems, including HDFS, S3, Kafka, Flume, etc., and outputs to different database systems.

Spark SQL

Spark SQL is the main module that extracts structured data with its querying capabilities. A diverse range of data formats can be read using Spark SQL. These include Parquet, JSON, Avro, and more. What‘s more, it allows User Defined Function generation and HiveQL usage.

GraphX

GraphX can be represented as the Spark for graph-like database systems with a parallel distributed execution. Abstractly, it is composed of vertices and edges. GraphX associates graph computations inside its system.

MLlib (Machine learning)

MLlib is the core machine learning library for Spark. It empowers the distributed approach to record and process data. It is composed of various algorithms, including regression, decision trees, k-means clustering, etc.

Spark architecture

Spark architecture is composed of a driver and worker nodes. These nodes are connected with the help of a cluster manager.

Driver node

The driver node is the master node responsible for executing the `main()` method. Its primary purpose is to create the required Spark session successfully.

Cluster manager

The cluster manager acts as the structure which distributes resources among the requested jobs. You can select Hadoop Yarn, Mesos, or Kubernetes as a cluster manager.

Worker node

The worker node covers the task of processing related code blocks. Inside it, the executor maintains the data in the memory after the execution of the scheduled job. The smallest unit in the executor architecture is called a task.

Why use Apache Spark

There exist countless reasons to select Spark. The most important ones are its ease of use, rapidness, and support.

Ease of use

Spark’s abilities are open through numerous APIs. They are all planned explicitly for communicating rapidly and effectively with information at scale. With its easy-to-understand structure, users can quickly produce results with Spark in a short time.

Rapidity

Spark is intended for fast performance. It works both in-memory and local storage. The execution speed of Spark has a significant difference in comparison with Hadoop’s MapReduce up to a hundred times.

Support

Spark supports multiple programming languages, including Python, R, Java, and Scala. It incorporates support for various memory applications in the Hadoop environment. Besides, the Apache Spark developer community is huge, dynamic, and worldwide. Business suppliers of Hadoop also offer extensive service for Spark applications.

Spark installation

Spark can be installed in separate ways depending on the platform. In this section, let’s introduce two different installation options:

  • setting it up on Google Colab, and
  • installing it on your local machine.

Setting up Spark on Google Colab

Google Colab is an environment where users effectively implement their Python scripts in their browsers. For Spark with Python to be executed on Google Colab, you have to install the appropriate Spark, Hadoop, and Java versions. Installing Spark on Google Colab can be done as shown below:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz

After successfully installing the corresponding versions on Google Colab, you can set up the environment variables for Spark and Java.

import os
os.environ[“JAVA_HOME”] = “/usr/lib/jvm/java-8-openjdk-amd64”
os.environ[“SPARK_HOME”] = “/content/spark-2.4.8-bin-hadoop2.7”

The `findspark` helps to find the previously installed PySpark version. Then, it enables PySpark to be importable as a library with the help of `findspark.init()`.

import findsparkfindspark.init()

Installing Apache Spark on the local machine

Apache Spark can run in any environment where Python, Scala, or Java are installed. This article will focus on the Python language. The easiest way to install required Python packages and Jupyter Notebook in a compact and fast way is to use Anaconda.

Install Spark using the command below on the Anaconda Prompt:

conda install pyspark

Apache Spark basics

Spark supports the Resilient Distributed Datasets (RDD) structure. An external data source can be read using this structure. Passing methods to Spark is possible using the RDD. These functions can be applied to an existing dataset or to a new dataset.

You’ll learn more about the RDD structure, Spark Transformations, and Actions in the upcoming sections.

Resilient Distributed Datasets (RDDs)

Resilient Distributed Dataset is the essential client-facing programming interface in Spark for in-memory calculations. It is a combination of data components.

RDD creation

You need to create a Spark session before you can create an RDD. This is done with the help of the `SparkContext`. The `SparkConf` is used to set other Spark configurations.

from pyspark import SparkContext, SparkConf

The next step is to define our desired Spark configuration. In this case, we’ll use a local cluster since we are working on a local machine. You also have the option to set up Spark in cluster mode.

Specifying `local[*]` means that Spark will use all the cores in the local machine. This is usually the default setting in stand-alone mode.

spark_configurations = (SparkConf().setMaster(“local[*]”).\setAppName(“firstSparkSession”).\set(“spark.executor.memory”, “2g”))

With this configuration at hand, we can create the Spark session.

spark_context = SparkContext(conf = spark_configurations)

There are some built-in functions for viewing spark configurations.

The Spark version can be retrieved using the `version` attribute.

spark_context.version

The Python version can be displayed using the `pythonVer` attribute.

spark_context.pythonVer

To view the number of cores that are assigned for the stand-alone mode, you can add a `master` parameter to the spark session variable. In the example below, the name of the Spark session is called `spark_context.`

spark_context = SparkContext(conf = spark_configurations)spark_context.master

Every Spark session has a unique name. The `setAppName` attribute can be used to set the name of the session.

spark_configurations = (SparkConf().setAppName(“firstSparkSession”))spark_context = SparkContext(conf = spark_configurations)

After assigning the application name, it is possible to view it with the `appName` attribute.

spark_context.appName

Spark creates a unique application id for every session. The id can be retrieved using the `applicationId` attribute.

spark_context.applicationId

Spark distributes jobs in every Spark session through parallelism. You can set this manually or use the default options.

Default settings can be viewed using the `defaultParallelism` attribute.

spark_context.defaultParallelism

You can set the default parallelism in the configuration phase of the Spark Context. This is done by using the `spark.default.parallelism` parameter.

spark_context.setConf(”spark.default.parallelism”, “50”)

Additionally, Spark enables a different number of partitions to be assigned to the jobs. The desired number of partitions can be set by adding the number in the `spark.default.partitions` configuration parameter. In the below example, `50` is the number of determined partitions.

spark_context.setConf(“spark.default.partitions”, “50”)

To print the default setting for the minimal count of partitions for the RDD, use the `defaultMinPartitions` attribute.

spark_context.defaultMinPartitions

RDD operations

In Spark, RDD operations are composed of `Transformations` and `Actions`. `Transformations` are the operations that can create a non-existent RDD by using an old one.

Spark Transformations

Several Spark transformations are available in an active Spark Session. In this section, let’s introduce the most common.

Map

The `Map` method returns a new distributed dataset resulting from passing each element through a function. In the following example, the `collect` operation is responsible for retrieving all the items in the existing RDD.

items = spark_context.parallelize ([4,13,13,28,36,47,56])
mapped_list = items.map(lambda x: x+2).collect()
print (“Printing mapped items for map operation of RDD: “, (mapped_list))

FlatMap

The flatMap method operates by performing a computation on each item of the RDD followed by a flattening operation.

items = spark_context.parallelize ([2,4,13])
items.flatMap(lambda x: range(1, x)).collect()

MapPartitions

With the help of `mapPartitions`, a method can be applied to every partition of the specified RDD.

partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 2)def mapPartitionFunc(ind): yield sum(ind)
partitioned.mapPartitions(mapPartitionFunc).collect()

MapPartitionsByIndex

The `mapPartitionsWithIndex` method enables a function to be executed on every partition of the RDD by not losing the index of the core partition.

partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 4)def mapPartitionByIndexFunc(indSlicer, ind): yield indSlicer
partitioned.mapPartitionsWithIndex(mapPartitionByIndexFunc).sum()

Filter

The filter method returns a new dataset after selecting the items that return `true` on a certain condition.

items = spark_context.parallelize ([4,13,13,28,36,47,56])filtered_list = items.filter(lambda x: x % 2 == 0).collect()print (“Printing filtered list items for filter operation of RDD: “, (filtered_list))

Sample

Sampling can be used in any stage of data processing. For RDD datasets, sampling may be applied by assigning a percentage value in the `sample` function. When requesting the same subset, a seed id can be added to the method.

sampling_items = spark_context.parallelize(range(20), 4)
sampling_items.sample(True, 0.3, 1234).collect()

Join

RDD datasets can be joined on a pair of matching keys using the `join` method.

list1 = spark_context.parallelize([(“k”, 98), (“m”, 65)])
list2 = spark_context.parallelize([(“k”, 120), (“k”, 43)])
sorted(list1.join(list2).collect())

Union

The `union` operation helps to unite the specified RDDs. It adds one followed by another. This operation does not search for a matching key between them.

union_items = spark_context.parallelize(range(5), 2)
union_items.union(union_items).collect()

Intersection

The `intersection` method is responsible for finding the intersecting group of elements in RDD datasets.

group1 = spark_context.parallelize([2, 10, 17, 3, 14, 5])
group2 = spark_context.parallelize([2, 8, 5, 34, 42, 14])
group1.intersection(group2).collect()

Distinct

The `distinct` function is used to obtain a unique group of elements from an RDD.

items = spark_context.parallelize ([4, 13, 13, 28, 36, 47, 56])
unique_element_list = items.distinct().collect()
print (“Printing distinct items for distinct operation of RDD: “, (unique_element_list))

GroupByKey

The usage of the `groupByKey` function requires grouping the elements for every key in one line. After this operation, the output of the RDD will have the hashed partitions.

groupedKeys = spark_context.parallelize([(“first_num”, 300),(“second_num”, 500), (“third_num”, 900)])print(sorted(groupedKeys.groupByKey().mapValues(len).collect()))
print(sorted(groupedKeys.groupByKey().mapValues(list).collect()))

ReduceByKey

The `reduceByKey` method performs a merging operation on the values of the RDD elements.

from operator import sub
reducedKeys = spark_context.parallelize([(“first_num”, 300),(“second_num”, 500),(“third_num”, 900),(“second_num”, 500)])
print(sorted(reducedKeys.reduceByKey(sub).collect()))

AggregateByKey

Two separate RDDs are required in a structure of common keys to perform an aggregation operation. First, aggregation for every item is actualized. After this step, the operation is applied to the outputs.

item_group1 = spark_context.parallelize([(‘first’,5),(‘first’,3),(‘second’,3)])item_group2 = spark_context.parallelize(range(20))

firstGroup = (lambda x,y: (x[0]+y,x[1]+1))
aggregatedGroup = (lambda x,y:(x[0]+y[0],x[1]+y[1]))

print(item_group2.aggregate((0,0),firstGroup,aggregatedGroup))
print(item_group1.aggregateByKey((0,0),firstGroup,aggregatedGroup))

SortByKey

The `sortByKey` method is responsible for sequencing the element pairs in an ascending way.

item_list = [(‘first’, 7), (‘second’, 9),(‘third’, 11), (‘fourth’, 34), (‘fifth’, 58)]spark_context.parallelize(item_list).sortByKey().first()

Spark Actions

Let’s now take a look at some Spark Actions.

Collect

The `collect` function returns all the elements of the dataset as an array.

items = spark_context.parallelize ([4,13,13,28,36,47,56])
number_list = items.collect()
print (“Printing elements for collect: %s” % (number_list))

First

The `first` method is used to get the first item from an RDD.

items = spark_context.parallelize ([4,13,13,28,36,47,56])
first_element = items.first()
print (“Printing first element with first operation of RDD: %s” % (first_element))

Take

The `take(n)` method returns the first n elements of the dataset.

items = spark_context.parallelize ([4,13,13,28,36,47,56])
take_element = items.take(3)
print (“Printing specified number of elements with take operation of RDD: %s” % (take_element))

Take Sample

The `takeSample` method returns a specified length of the RDD. In this method, the first parameter is `withReplacement`. It indicates whether there is a need to replace the new results with the old ones. If yes, then set it to `True`, otherwise set it to `False`.

The second parameter is the number to be sampled. The third parameter is the `seed` number. When set to any number, it is the identifier ID for this specific sample. Anytime you run this sampling function with the same ID inside the current Spark session, it returns the same subset of samples.

items = spark_context.parallelize ([5,13,13,28,36,47,56])
items.takeSample(True, 5, 1)

TakeOrdered

The `takeOrdered` function takes the determined number of items out of the RDD ordered in an ascending manner.

items = spark_context.parallelize ([44,131,836,147,56]).takeOrdered(6)print (items)

Count

The `count` function returns the number of elements regardless of duplicate or non-duplicate records that are found in the RDD.

element_count = items.count()print (“Printing number of instances for count operation of RDD: %i” % (element_count))

CountByKey

The `countByKey` function differs from the `count` function by counting the items by their corresponding keys.

countKey = spark_context.parallelize([(“first_num”, 300), (“second_num”, 500), (“third_num”, 900), (“second_num”, 500), ])
sorted(countKey.countByKey().items())

SaveAsTextFile

RDD datasets can be saved in a text format with the help of the `saveasTextFile` function.

items = spark_context.parallelize ([4,13,13,28,36,47,56])saved_list = items.saveAsTextFile(“items.txt”)

RDD persistence

The main advantage of Spark is the ability to keep a dataset in memory across partitions. Persistence is achieved through caching. Partitions are processed in memory to enable storing the RDD in the cache. After that, they can be reused in different operations on that dataset. This allows future actions to be a lot quicker.

An RDD can be registered in an activity for the first time. After that, it will be kept in memory in partitions. However, these RDD partitions have the risk of being lost. Spark can recompute changes that were initially made.

By caching the RDDs, users continue to keep the dataset on the hard drive and reuse it across other jobs. Some RDDs can be used several times. Using the `persist` operation on those RDDs can be advantageous.

With the storage level of the `persist` method, RDD partitions will be cached to memory and disk only once when `MEMORY_AND_DISK` is selected.

item_list = spark_context.parallelize([(‘first’,5), (‘first’,3), (‘second’,3)])item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK )item_list.getStorageLevel()print(item_list.getStorageLevel())

On the other hand, when `MEMORY_AND_DISK_2` is selected, RDD partitions will have two replications both on memory and disk.

item_list = spark_context.parallelize([(‘first’,5), (‘first’,3), (‘second’,3)])item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2 )item_list.getStorageLevel()print(item_list.getStorageLevel())

Spark session creation with PySpark

A Spark session is required to run PySpark functions. First, the required libraries have to be imported.

from pyspark.sql import SparkSession
from pyspark.context import SparkContext

After loading the related libraries, a spark session can be initiated by simply adding an `appName` and a `getOrCreate` function. If any additional config requirement exists, such as the memory size of the executor, then a `config` parameter can be included in the Spark session building block.

spark = SparkSession.builder.\
appName(“FirstSparkApplication”).\
config (“spark.executor.memory”, “8g”).\
getOrCreate()

Creating Spark Dataframes by reading different Data formats

After creating the session, data can be read and loaded into supporting data structures as DataFrames. DataFrames can be described as the collection of column-based table formats.

Below, differently formatted files are being read using the corresponding file name functions.

json_df = spark.read.json(“dataset.json”)
text_df = spark.read.text(“dataset.txt”)
csv_df = spark.read.csv(“dataset.csv”)
parquet_df = spark.read.parquet(“dataset.parquet”)

Machine learning in Spark

Spark contains a separate library called MLlib that supports several machine learning algorithms. The core fields that the MLlib enhances are:

  • machine learning computations,
  • featurization,
  • generating pipeline structures,
  • persistence

Let’s discuss the steps for implementing a machine learning model using Spark.

Data preparation

Throughout this article, a very well-known dataset, the “Titanic,” will be used. The public dataset can be downloaded from its GitHub page. Also, you can view its license from the link.

As the first step, we’ll read the dataset with the help of the Spark session.

The dataset has its format with an essential requirement of the usage of `csv` function. It is specified as a parameter in the `spark.read.format()`.

training_dataset = spark.read.format(“csv”).\option(“inferSchema”, True). option(“header”, “true”).\load(‘dataset/titanic_train.csv’)test_dataset = spark.read.format(“csv”).\option(“inferSchema”, True).option(“header”, “true”).\load(‘dataset/titanic_test.csv’)

As the initial analysis step, let’s display the column names. This can be done in three different ways by using PySpark.

The first way is to use the `show` method while passing the number of rows you’d like to display as the argument.

training_dataset.show(5)
Fig.1. Output of `show(5)` function. Owned by the author.

The second way is to use the `show()` method without passing any arguments. This action will output 20 rows. Its default format contains truncated column content. Truncated columns can be observed for the `Name` column since it exceeds the default length.

training_dataset.show()
Fig.2. Output of `show()` function. Owned by the author.

The full column content can be viewed by setting `truncate = False` in the `show` method. Also, the default horizontal display can be changed by adding `vertical = True` in the `show()` function.

The full column content can be viewed by setting `truncate = False` in the `show` method. Also, the default horizontal display can be changed by adding `vertical = True` in the `show()` function.training_dataset.show(2, truncate=False, vertical=True)
Fig.3. Output of `show(truncate=False, vertical=True)` function. Owned by the author

Data preprocessing with Spark

After viewing the column names and their types, it is crucial to check if the dataset includes any `null` or `nan` values. We have to fill them out before the modeling step.

Let’s display the null and non-null columns below.

from pyspark.sql.functions import *
print (“NaN values\n”)
training_dataset.select([count(when(isnan(item), item)).alias(item) for item in training_dataset.columns]).show(5)

print (“Null values\n”)
training_dataset.select([count(when(col(item).isNull(), item)).alias(item) for item in training_dataset.columns]).show(5)

print (“Not Null values\n”)
training_dataset.select([count(when(col(item).isNotNull(), item)).alias(item) for item in training_dataset.columns]).show(5)
Fig 4. The output of filled and unfilled values. Owned by the author

Some column names can be renamed using the `withColumnRenamed` function. With this approach, multiple columns can be renamed by simply adding them one by one with a dot separator. The function’s first parameter is the original value, and the second parameter is the new column name.

print(“Renaming Column Name”)training_dataset = training_dataset.\
withColumnRenamed(“Pclass”,”PassengerClasses”).\
withColumnRenamed(“Sex”,”Gender”)
training_dataset

The `groupBy` SQL operation can be applied to a single column with a `count` operation. Additionally, multiple values can be added inside the function for multiple grouping actions.

The `sort()` function can also be added at the end of the `count()` function. By observing the counts by classes on the output, we can see the `Survived` count as highest in the third passenger class for both genders.

print(“Counting the number of Passenger per Classes”)training_dataset.groupBy(“PassengerClasses”).\count().\sort(“PassengerClasses”).show()


print(“Counting the number of Survivals by Classes”)
training_dataset.groupBy(“PassengerClasses”,
“Gender”,
“Survived”).count().sort(“PassengerClasses”,
“Gender”,
“Survived”).show()
Fig 5. The output of the `groupBy` operation. Owned by the author.

Feature engineering with PySpark

With the help of feature engineering, more insightful information can be extracted from the existing variables in the dataset.

There’s a `Name` column in the titanic dataset that also includes the person’s title. This information might be beneficial in the model. So let’s generate it as a new variable. A new title column can be created using the `withColumn` operation.

training_dataset = training_dataset.withColumn(“Title”, regexp_extract(col(“Name”),”([A-Za-z]+)\.”, 1))training_dataset.select(“Name”,”Title”).show(10)
Fig 6. Output of `withColumn` title extraction operation. Owned by the author.

A new column is produced with the `Title` name. Listing each title by its count can show us that some titles are seen only once.

training_dataset.groupBy(“Title”).count().show()
Fig 7. The output of the `groupBy` operation. Owned by the author.

There are some duplicated titles that are in different formats. Some of them can be replaced. For this purpose, the `replace` function can be applied.

feature_df = training_dataset.\
replace([“Mme”,
“Mlle”,”Ms”,
“Major”,”Dr”, “Capt”,”Col”,”Rev”,
“Lady”,”Dona”, “the Countess”,”Countess”, “Don”, “Sir”, “Jonkheer”,”Master”],
[“Mrs”,
“Miss”, “Miss”,
“Ranked”,”Ranked”,”Ranked”,”Ranked”,”Ranked”,
“Royalty”,”Royalty”,”Royalty”,”Royalty”,”Royalty”, “Royalty”, “Royalty”,”Royalty”])

feature_df.groupBy(“Title”).count().sort(desc(“count”)).show()
Fig 8. Output of `groupBy` operation for `Title` column. Owned by the author.

After the replacement operation, the distribution of the titles seems more accurate than before.

Building the machine learning model with PySpark MLlib

Before the model implementation phase, the types of variables should be inspected. Since the prediction algorithms request numerical formatted variables, string-formatted columns may cause errors.

PySpark’s `dtypes` function can be used to print the types of the variables.

feature_df.dtypes
Fig 9. The output of `dtypes` operation for the dataframe. Owned by the author.

After printing the types of the variables, it can be observed that the `Gender`, `Embarked`, and `Title` columns have a string format. These columns need to be converted to a numerical form.

A specialized PySpark function called `StringIndexer` fits and transforms variables into numeric types. Let’s implement it below.

from pyspark.ml.feature import StringIndexerparchIndexer = StringIndexer(inputCol=”Parch”, outputCol=”Parch_Ind”).fit(df)sibspIndexer = StringIndexer(inputCol=”SibSp”, outputCol=”SibSp_Ind”).fit(df)passangerIndexer = StringIndexer(inputCol=”PassengerClasses”, outputCol=”PassengerClasses_Ind”).fit(df)survivedIndexer = StringIndexer(inputCol=”Survived”, outputCol=”Survived_Ind”).fit(df)

After the indexing and dropping of old string-formatted operations, the DataFrame has all numerical variables. Since all the columns have a non-string format, we can generate a feature vector using the columns in the DataFrame. The `VectorAssembler` can be applied to transform the `features` vector column.

from pyspark.ml.feature import VectorAssemblerassembler = VectorAssembler(
inputCols = [“PassengerClasses”,”SibSp”,”Parch”],
outputCol = “features”)
The next step after creating the feature vector is to split the data into train and test sets. You can use the `randomSplit` function to achieve this.(train, test) = df.randomSplit([0.8, 0.2], seed = 345)

Before applying the prediction algorithm, a classifier and a pipeline generation phase needs to be implemented.

Let’s define these steps together. First, let’s select a classifier from the MLlib library built-in PySpark functions. After adding the import statements, the classifier can be created by assigning the `labelCol` and `featuresCol` columns.

from pyspark.ml.classification import DecisionTreeClassifierclassifier = DecisionTreeClassifier(labelCol=”Survived”, featuresCol=”features”)In this step, a pipeline is created by adding parameters to `stages` accordingly.from pyspark.ml import Pipelinepipeline = Pipeline(stages=[assembler, model_identifier])When the pipeline is established, parameters of the classifier can be optimized with the help of `ParamGridBuilder`.Appropriate parameters will be produced after the grid search.from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
.addGrid(model_identifier.maxDepth, [10,20]) \
.addGrid(model_identifier.maxBins, [50, 100]) \
.build()

For this purpose, the corresponding `label`, `features`, and `metric` columns are filled.

tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(labelCol=”Survived”,
predictionCol=”prediction”, metricName=”weightedPrecision”),
trainRatio=0.8)

After the `TrainValidationSplit` stage is completed, we are ready to fit the model.

model = tvs.fit(train)

Model evaluation

As a model evaluation method, the metric `accuracy` can be applied. The mathematical formula of `accuracy` is as follows.

(TruePositive + TrueNegative) /
(TruePositive + TrueNegative + FalsePositive + FalseNegative)

With the line of code below, we can obtain the accuracy metrics by each parameter.

list(zip(model.validationMetrics, model.getEstimatorParamMaps()))

Serving Apache Spark machine learning models

Machine learning models generated using PySpark can be served using MLflow. In the following sections, the installation of the MLflow package will be explained. In addition, the model serving methodology will be presented with some sample scripts added at the end of the concept descriptions.

Installing MLflow for Spark Model Serving

MLflow can be used as the model serving library for PySpark models. Installation of the library is required to use MLflow in the Spark session. For PySpark, the package can be installed by using the following command.

pip install mlflow

Import `spark` after installing MLflow.

from mlflow import spark

Serving Spark Model with MLflow

Execute the `start_run()` function after importing MLflow to activate MLflow in a Spark session.

import mlflowfrom mlflow import spark
with mlflow.start_run():
mlflow.spark.log_model(model, “sparkML-model”)

After executing the `log_model` operation the MLflow, model `artifacts`, `metrics`, `params`, and `tags` folders will be created.

Fig 10. The desktop-based mlflow folder structure. Owned by the author.

In the `artifacts` folder, you’ll find the spark ML-model folder. In the `sparkML-model`, there are `metadata` and `stages` folders. `Stages` records the lifecycle of the model. There can be a single stage or multiple stages. On the other hand, `metadata` represents the set of data that describes and contains information about the model.

Fig 11. The metadata and stages folder structure. Owned by the author.
Fig 12. The sparkML folder structure. Owned by the author.

The stages folder contains the `bestModel` information.

Fig 13. The stages folder path. Owned by the author.

In below, you can find a snippet from a sample format for the `MLmodel` file that is saved under the sparkML-model file.

Fig 14. The MLmodel output. Owned by the author.

Inferences can be generated using the `mlflow.pyfunc` module. Firstly, model and dataset paths are defined separately. Secondly, a Spark UDF is defined by using the model path. Thirdly, the dataset is read and registered into a dataframe. For the last step, a new column is produced with the help of previously defined Spark UDF by selecting the requested columns.

import mlflow.pyfunc
from pyspark.sql import SQLContext

train.toPandas().to_csv(‘dataset.csv’)

model_path = ‘/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/mlruns/1/51ef199ab3b945e8a31b47cdfbf60912/artifacts/sparkML-model’
titanic_path = ‘/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/dataset.csv’titanic_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format(“csv”).option(“inferSchema”, True).option(“header”, “true”).option(‘delimiter’, ‘;’).load(titanic_path)

columns = [‘PassengerClasses’, ‘SibSp’, ‘Parch’]

df.withColumn(‘Inferences’, titanic_udf(*columns)).show(False)
Fig 15. The mlFlow output. Owned by the author.

Metadata store and experiment tracking with Layer

To finalize an ideal model, you are required to iteratively alter various model parameters or data preparation methods to reach the most optimum solution. At each iteration, it is critical to maintaining track of the parameters and processes. Additionally, you may experiment with various settings at each stage. Failure to follow these instructions might result in their repeating, which wastes time and computing resources.

Experiment tracking can be defined as the technique for keeping track of all these data.

A metadata store — a tool that can keep track of and retain all the information stated above — is necessary for successful experiment tracking. Data produced when creating machine learning models are saved in a metadata store. Comparability and repeatability of ML experiments are made possible by storing experiment metadata.

In contrast to existing metadata stores, Layer also supports seamless local and remote development interaction. For instance, you may train the model utilizing more potent computational resources leveraging Layer infrastructure after doing a few fast trials using local resources. Whether you are utilizing local or remote mode, layer versions metadata, and logs.

With layer.log(), @dataset, and @model decorators, you can keep track of your discoveries, datasets, and models as the project progresses. Your entities are automatically versioned by Layer so that you may accurately repeat experiments and see their progression.

With common repositories for your datasets, ML models, metadata, and documentation, you can bridge the collaboration gap and simplify new recruit onboarding. Log parameters, charts, metrics, and more; identify ML entities fast; produce project documentation; and effortlessly transfer control.

Production pipelines that are unstable are bad. As a sanity check for production pipelines, Layer automatically versions datasets and models.

Complex undertakings are constrained by a lack of computational power. You may train your models on potent pre-defined GPU and CPUs using Layer’s cloud resources. The Layer SDK handles all the pain and complexity of infrastructure. You may use your own resources if you don’t require electricity.

A few Python decorators provided by Layer produce a robust model and data registry. It offers a central location to manage all data related to the creation of your machine learning models.

To help you get started fast, the Layer community website offers a variety of sample projects and datasets. The GitHub repository has further examples as well. You may start building experiment tracking and metadata store packages by going to the website of layer.ai.

Production pipelines that are unstable perform in an unwanted manner. As a sanity check for production pipelines, Layer automatically versions datasets and models.

Complex undertakings are constrained by a lack of computational power. You may train your models on potent pre-defined GPU and CPUs using Layer’s cloud resources. The Layer SDK handles all the pain and complexity of infrastructure. You may use your own resources if you don’t require electricity.

A few Python decorators provided by Layer produce a robust model and data registry. It offers a central location to manage all data related to the creation of your machine learning models.

To help you get started fast, the Layer community website offers a variety of sample projects and datasets. The GitHub repository has further examples as well. You may start building experiment tracking and metadata store packages by going to the website of layer.ai.

Final thoughts

Throughout this article, a wide range of topics were presented in the structure of initially describing the concept and implementing the solutions with sample scripts. The topics contain introducing Spark, followed by building machine learning models in Spark. The topics involved but are not limited to:

  • The concept of Spark
  • Spark components and its unique architecture
  • Installations of Spark and Python
  • The essentials of RDD and its operations
  • Spark transformations and actions by their varying functions
  • Spark session generation and DataFrames
  • Exploratory data analysis with Spark
  • Machine learning in PySpark
  • Data preparation, preprocessing, and feature engineering using PySpark
  • Model building stages in PySpark
  • Model serving with Apache Spark
  • Metadata store and experiment tracking with Layer SDK

Resources

--

--