Stop using Pandas and start using Spark with Scala

Why Data Scientists and Engineers should think about using Spark with Scala as an alternative to Pandas and how to get started

Chloe Connor
Towards Data Science

--

Source: https://unsplash.com/photos/8IGKYypIZ9k

Moving from Pandas to Spark with Scala isn’t as challenging as you might think, and as a result your code will run faster and you’ll probably end up writing better code.

In my experience as a Data Engineer, I’ve found building data pipelines in Pandas often requires us to regularly increase resources to keep up with the increasing memory usage. In addition, we often see many runtime errors due to unexpected data types or nulls. As a result of using Spark with Scala instead, solutions feel more robust and easier to refactor and extend.

In this article we’ll run through the following:

  1. Why you should use Spark with Scala over Pandas
  2. How the Scala Spark API really isn’t too different from the Pandas API
  3. How to get started using either a Jupyter notebook or your favourite IDE

What is Spark?

  • Spark is an Apache open-source framework
  • It can be used as a library and run on a “local” cluster, or run on a Spark cluster
  • On a Spark cluster the code can be executed in a distributed way, with a single master node and multiple worker nodes that share the load
  • Even on a local cluster you will still see performance improvements over Pandas, and we’ll go through why below

Why use Spark?

Spark has become popular due to its ability to process large data sets at speed

  • By default, Spark is multi-threaded whereas Pandas is single-threaded
  • Spark code can be executed in a distributed way, on a Spark Cluster, whereas Pandas runs on a single machine
  • Spark is lazy, which means it will only execute when you collect (ie. when you actually need to return something), and in the meantime it builds up an execution plan and finds the optimal way to execute your code
  • This differs to Pandas, which is eager, and executes each step as it reaches it
  • Spark is also less likely to run out of memory as it will start using disk when it reaches its memory limit

For a visual comparison of run time see the below chart from Databricks, where we can see that Spark is significantly faster than Pandas, and also that Pandas runs out of memory at a lower threshold.

https://databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html

Spark has a rich ecosystem

  • Data science libraries such as Spark ML, which is built in, or Graph X for graph algorithms
  • Spark Streaming for real time data processing
  • Interoperability with other systems and file types (orc, parquet, etc.)

Why use Scala instead of PySpark?

Spark provides a familiar API, so using Scala instead of Python won’t feel like a huge learning curve. Here are few reasons why you might want to use Scala:

  • Scala is a statically typed language, which means you’ll find your code will likely have fewer runtime errors than with Python
  • Scala also allows you to create immutable objects, which means when referencing an object you can be confident its state hasn’t been mutated in between creating it and calling it
  • Spark is written in Scala, so new features are available in Scala before Python
  • For Data Scientists and Data Engineers working together, using Scala can help with collaboration, because of the type safety and immutability of Scala code

Spark core concepts

  • DataFrame: a spark DataFrame is a data structure that is very similar to a Pandas DataFrame
  • Dataset: a Dataset is a typed DataFrame, which can be very useful for ensuring your data conforms to your expected schema
  • RDD: this is the core data structure in Spark, upon which DataFrames and Datasets are built

In general, we’ll use Datasets where we can, because they’re type safe, more efficient, and improve readability as it’s clear what data we can expect in the Dataset.

Datasets

To create our Dataset we first need to create a case class, which is similar to a data class in Python, and is really just a way to specify a data structure.

For example, let’s create a case class called FootballTeam, with a few fields:

case class FootballTeam(
name: String,
league: String,
matches_played: Int,
goals_this_season: Int,
top_goal_scorer: String,
wins: Int
)

Now, let’s create an instance of this case class:

val brighton: FootballTeam =
FootballTeam(
"Brighton and Hove Albion",
"Premier League",
matches_played = 29,
goals_this_season = 32,
top_goal_scorer = "Neil Maupay",
wins = 6
)

Let’s create another instance called manCity and now we’ll create a Dataset with these two FootballTeams:

val teams: Dataset[FootballTeam] = spark.createDataset(Seq(brighton,  
manCity))

Another way to do this is:

val teams: Dataset[FootballTeam] = 
spark.createDataFrame(Seq(brighton, manCity)).as[FootballTeam]

The second way can be useful when reading from an external data source and returning a DataFrame, as you can then casting to your Dataset, so that we now have a typed collection.

Data transformations

Most (if not all) of the data transformations you can apply to Pandas DataFrames, are available in Spark. There are of course differences in syntax, and sometimes additional things to be aware of, some of which we’ll go through now.

In general, I’ve found Spark more consistent in notation compared with Pandas and because Scala is statically typed, you can often just do myDataset. and wait for your compiler to tell you what methods are available!

Let’s start with a simple transformation, where we just want to add a new column to our Dataset, and assign it constant value. In Pandas this looks like:

Pandasdf_teams['sport'] = 'football'

There’s a small difference in Spark, besides syntax, and that’s that adding a constant value to this new field requires us to import a spark function called lit.

Sparkimport org.apache.spark.sql.functions.litval newTeams = teams.withColumn("sport", lit("football"))

Note that we’ve created a new object as our original teams dataset is a val, which means it’s immutable. This is a good thing as we know that whenever we use our teams Dataset, we always get the same object.

Now let’s add a column based on a function. In Pandas this will look like:

Pandasdef is_prem(league):
if league == 'Premier League':
return True
else:
return False
df_teams['premier_league'] = df_teams['league'].apply(lambda x:
is_prem(x))

To do the same in Spark, we need to serialise the function so that Spark can apply it. This is done using something called UserDefinedFunctions. We’ve also used a case match, as this is a nicer implementation in Scala than the if-else, but either will work.

We will also need to import another useful spark function, col, which is used to refer to a column.

Sparkimport org.apache.spark.sql.functions.coldef isPrem(league: String): Boolean =
league match {
case "Premier League" => true
case _ => false
}
val isPremUDF: UserDefinedFunction =
udf[Boolean, String](isPrem)
val teamsWithLeague: DataFrame = teams.withColumn("premier_league",
isPremUDF(col("league")))

Now that we’ve added a new column that isn’t in our case class, this will convert it back to a DataFrame. So we either need to add another field to our original case class (and allow it to be nullable, using Options), or create a new case class.

An Option in Scala just means the field is nullable. If the value is null we use None, and if populated we use Some("value") . An example of an optional string:

val optionalString : Option[String] = Some("something")

To get the string from this we can call optionalString.get() , and this will just return "something" . Note that if we’re not sure whether it will be null or not, we can use optionalString.getOrElse("nothing") which will return the string "nothing" if null.

Filtering a Dataset is another common requirement, which is a good example of where Spark is more consistent than Pandas, as it follows the same pattern as other transformations, where we do dataset “dot” transformation (ie dataset.filter(...) ).

Pandasdf_teams = df_teams[df_teams['goals_this_season'] > 50]
Sparkval filteredTeams = teams.filter(col("goals_this_season") > 50)

We are likely to need to perform some aggregations on our dataset, which is very similar in Pandas and Spark.

Pandasdf_teams.groupby(['league']).count()
Sparkteams.groupBy("league").count()

For multiple aggregations, we can again do something similar to Pandas, with a map of field to aggregation. If we want to do our own aggregations we can use UserDefinedAggregations.

teams.agg(Map(
"matches_played" -> "avg",
"goals_this_season" -> "count"))

Often we also want to combine multiple Datasets, which may be with union:

Pandaspd.concat([teams, another_teams], ignore_index=True)
Spark
teams.unionByName(anotherTeams)

… or with a join:

val players: Dataset[Player] = spark
.createDataset(Seq(neilMaupey, sergioAguero))
teams.join(players,
teams.col("top_goal_scorer") === players.col("player_name"),
"left"
).drop("player_name")

In this example we have also created a new Dataset, this time using a case class called Player. Note that this case class has a field injury, which can be null.

case class Player(player_name: String, goals_scored: Int, 
injury: Option[String])

Notice that we’ve dropped the player_name column as this will be a duplicate of top_goal_scorer.

We may also want parts of our code to just use Scala native data structures such as Arrays, Lists, etc. To get one of our columns as an Array, we need to map to our value and call .collect().

val teamNames: Array[String]  = teams.map(team => team.name)
.collect()

Note that we’re able to use the case class’ inbuilt getters to return the name field, and this won’t compile if name is not a field in our class FootballTeam.

As an aside, we can add functions to our case classes too, and both values and functions will come up as options for autocompletion when using an IDE such as IntelliJ or vs code with Metals plugin.

To filter our Dataset based on whether it exists in this Array we need to treat it as a sequence of args, by calling _*.

val filteredPlayers: Dataset[Player] = players
.filter(col("team").isin(teamNames: _*))

Running some code

At this point hopefully you’re keen to have a go at writing some Spark code, even if just to see whether my claim that it’s not too different from Pandas stands up.

To get started, we have a couple of options. We can use a notebook, which is a quick way to get some data and start playing around. Alternatively, we can set up a simple project. Either way you’ll need Java 8 installed.

Notebook

For this example we’re going to use a spylon kernel in a Jupyter notebook. https://pypi.org/project/spylon-kernel/. First run the following commands to set up your notebook, which should open up your notebook in a browser. Then select the spylon-kernel from your available kernels.

 pip install spylon-kernel
python -m spylon_kernel install
jupyter notebook

Let’s check we have the correct Java version by adding the following to a cell:

!java -version

The output should be:

java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

If not, check JAVA_HOME in your bash profile, and ensure it’s pointing to Java 8.

The next step is to install some dependencies. To do this we can add the following code snippet to a new cell. This sets up some spark config and also allows you to add dependencies. Here I’ve added a visualisation library called vegas.

%%init_spark
launcher.num_executors = 4
launcher.executor_cores = 2
launcher.driver_memory = '4g'
launcher.conf.set("spark.sql.catalogImplementation", "hive")
launcher.packages = ["org.vegas-viz:vegas_2.11:0.3.11",
"org.vegas-viz:vegas-spark_2.11:0.3.11"]

To connect to our data source we can define a function, perhaps something like this:

def getData(file: String): DataFrame = 
spark.read
.format("csv")
.option("header", "true")
.load(file)

This is a connection to a csv file but there are lots of other data sources we can connect to. This function returns a DataFrame, which we may want to convert to a Dataset:

val footballTeams: Dataset[FootballTeam] = 
getData("footballs_teams.csv").as[FootballTeam]

We can then start working with this data and have a go at some of the data transformations we discussed, and many more.

Setting up a project

Now that you’ve had a go at playing around with some data, you might want to set up a project.

The two main things to include:

  • build.sbt - where previously we added our dependencies in one of our notebook cells, now we need to add them to our build.sbt file
  • SparkSession - in the notebook we already had a spark session, which meant we were able to do things such as spark.createDataFrame. In our project we need to create this spark session

Example build.sbt:

name := "spark-template"
version := "0.1"
scalaVersion := "2.12.11"
val sparkVersion = "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

Example SparkSession:

import org.apache.spark.sql.SparkSession trait SparkSessionWrapper {       val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("spark-example")
.getOrCreate()
}

We can then extend objects with this wrapper, which gives us a spark session.

object RunMyCode extends SparkSessionWrapper {    //your code here
}

You can then start writing your spark code!

To conclude, Spark is a great tool for fast data processing and is growing every more popular in the data world. As a result Scala is also becoming a more popular language, and due to its type safety can be a good choice for data engineers and data scientists, who may be more familiar with Python and Pandas. Spark is a great introduction to the language because we can use familiar concepts such as DataFrames, so it doesn’t feel like a huge learning curve.

Hopefully this has given you a quick overview, and perhaps enabled you to start exploring Spark, either within your notebook, or within your new project. Good luck!

--

--