A Journey Into Big Data with Apache Spark — Part 2

An introduction to building your first Apache Spark application with Scala to read from CSV files.

Ash Broadley
Towards Data Science

--

A tabular view of a DataFrame from a CSV file in Apache Spark

Welcome back for the second part of (what I hope to be) a series of posts about getting to know Apache Spark.

In the first episode, we learned how to create and run an Apache Spark cluster using Docker. If you’ve not read that yet, you can do so here. I’ll be using that cluster as a cluster to run my Spark application against, so it will be quite useful for you to have it up and running.

I’ll be using Scala to build the application because I also want to get to know that. Scala isn’t something I’ve used much at all, so please do bear with me while I figure some things out :). If there’s a better way of doing anything I do, please do let me know — I’m always open to feedback!

Before We Start

This isn’t going be an in depth look at how to set up and configure a Scala application, as the aim is to get up and running quickly and dive into the world of Spark. The Scala world does indeed make this easy, by offering a tool called SBT, the Scala Build Tool, and there a few things worth noting that will only help make this simpler.

Setting Up Our Environment

If you’ve read my previous post, I allude to the fact I’m a huge Docker fan and, in true fanboy style, I’ll be using a Docker image that already contains Scala and SBT to aid my development. The Dockerfile I’ll be using can be found here. This Dockerfile is great because it takes environment parameters at build time that installs specific versions of Scala and SBT specified by such parameters (Many thanks to original authors & contributors!). I’ve added my own little twist to the Dockerfile, mainly setting the WORKDIR to /project (as that’s where I mount my project directory, like my code) and adding a CMD to launch into the sbt console when we start the container. We can build the image by running the following command:

docker build -t ls12styler/scala-sbt:latest \
--build-arg SCALA_VERSION=2.12.8 \
--build-arg SBT_VERSION=1.2.7 \
github.com/ls12styler/scala-sbt

You can see we’re tagging the built image as ls12styler/scala-sbt:latest so we can simply run the image by running the following, landing us in a bash shell, in our previously configured WORKDIR:

docker run -it --rm ls12styler/scala-sbt:latest /bin/bash

We can validate the installations by running scala -version and sbt sbtVersion, leading to the following output:

bash-4.4# scala -version
Scala code runner version 2.12.8 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.
bash-4.4# sbt sbtVersion
[warn] No sbt.version set in project/build.properties, base directory: /
[info] Set current project to root (in build file:/)
[info] 1.2.7

In order for us to have access to our local files, we need to mount a volume from our working directory to somewhere on the running container. We can do this by simply adding the -v option to our run command. We’ll remove the /bin/bash so we land right into the sbt console:

docker run -it --rm -v `pwd`:/project ls12styler/scala-sbt:latest

All we’ve done here is mount the pwd (present working directory) under the /project directory on the container. When running the above, we’ll end up in the SBT console on that path:

[warn] No sbt.version set in project/build.properties, base directory: /project
[info] Set current project to project (in build file:/project/)
[info] sbt server started at local:///root/.sbt/1.0/server/07b05e14c4489ea8d2f7/sock
sbt:project>

Setting Up Our Project

Create a new file in your project directory called build.sbt and open it with your favourite editor. Fill it with the below contents, which are actually borrowed from the official Spark documentation, although slightly tweaked:

name := "MyFirstScalaSpark"
version := "0.1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

This gives us a minimal project definition to get started with. NOTE: We’ve specified the Scala version as 2.11.12 as Spark is compiled against Scala 2.11, but the version of Scala on the container is 2.12. In the SBT console, run the reload command to refresh the SBT project with the new build settings:

sbt:local> reload
[info] Loading project definition from /project/project
[info] Loading settings for project project from build.sbt ...
[info] Set current project to MyFirstScalaSpark (in build file:/project/)
sbt:MyFirstScalaSpark>

You should notice the console take on the name of our project: MyFirstScalaSpark. Now we have an environment to build our project in. Let’s write some code!

Our First Application

We’ll follow the Spark documentation a little further just to test where we’ve got to so far.

SBT applications take the standard directory structure of a Java application, so let’s create some new directories in our project directory (using the -p flag will create the directories recursively): mkdir -p ./src/main/scala/

Create a new file called MyFirstScalaSpark.scala in the newly created directory and open it in your favourite editor. Add the following contents (again, slightly tweaked from the original):

package com.example
import org.apache.spark.sql.SparkSession
object MyFirstScalaSpark {
def main(args: Array[String]) {
val SPARK_HOME = sys.env("SPARK_HOME")
val logFile = s"${SPARK_HOME}/README.md"
val spark = SparkSession.builder
.appName("MyFirstScalaSpark")
.getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}

As we’ll be running this application on the cluster that we created in Part 1, we know that the $SPARK_HOME environment variable will be set and point to the right directory on the Spark Workers. In the above code, we simply retrieve the contents of the $SPARK_HOME (which should be /spark) environment variable, interpolate it into the file path for the README.md that comes with the distribution of Spark we’re using, create our Spark session and then perform a couple of MapReduce filters to count the various number of lines that contain either the letter a or b. We then output those counts to the console.

Now that we’ve got some code to actually compile, we can create a jar that we’ll submit to the Spark cluster. In the SBT console, simply run package to generate the jar. You should see some output akin to the following:

sbt:MyFirstScalaSpark> package
[info] Updating ...
[info] Done updating.
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /project/target/scala-2.11/classes ...
[info] Done compiling.
[info] Packaging /project/target/scala-2.11/myfirstscalaspark_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 11 s, completed Dec 27, 2018 3:22:16 PM

As you can see the jar has been output at /project/target/scala-2.11/myfirstscalaspark_2.11-1.0.jar within the container, which means that locally, we can find the jar in `pwd`/target/scala-2.11/.

Submitting Our Application

Now it’s time to bring the cluster that we created in Part 1 back to life! Find the directory that contains the docker-compose.yml and run:

docker-compose up --scale spark-worker=2

This will bring up a Spark Master and two Spark Workers, which should be plenty to demonstrate that our first application actually works.

In our project directory, we can use the same Docker image we created to use in the cluster as our Spark Driver. The Spark Driver is the name for the place in which we submit our applications to the Spark cluster and we can launch it by using the following command:

docker run --rm -it -e SPARK_MASTER="spark://spark-master:7077" \
-v `pwd`:/project --network docker-spark_spark-network \
ls12styler/spark:latest /bin/bash

In this command, we set the contents of the $SPARK_MASTER environment variable, mount the pwd under /project on the container, attach it to the Docker network we created and drop into a bash shell. To submit our application, simply submit it to the spark driver:

bash-4.4# spark-submit --master $SPARK_MASTER \
--class com.example.MyFirstScalaSpark \
/project/target/scala-2.11/myfirstscalaspark_2.11-0.0.1.jar

When we submit the application, we specify URI to the Spark Master, the name of the class to run and the jar that class resides in. As we launched the container in our project directory, the Spark Driver container can access the jar we built, without us having to copy it around the underlying filesystem. Amidst the logs when submitting to Spark, you’ll see the line we construct in the code output:

...
Lines with a: 62, Lines with b: 31
...

And if we check the logs of the cluster, we’ll see something like the following:

spark-master    | 2018-12-27 16:24:10 INFO  Master:54 - Registering app MyFirstScalaSpark
spark-master | 2018-12-27 16:24:10 INFO Master:54 - Registered app MyFirstScalaSpark with ID app-20181227162410-0005
spark-master | 2018-12-27 16:24:10 INFO Master:54 - Launching executor app-20181227162410-0005/0 on worker worker-20181227134310-172.21.0.4-39747
spark-master | 2018-12-27 16:24:10 INFO Master:54 - Launching executor app-20181227162410-0005/1 on worker worker-20181227134310-172.21.0.3-42931
spark-worker_1 | 2018-12-27 16:24:10 INFO Worker:54 - Asked to launch executor app-20181227162410-0005/0 for MyFirstScalaSpark
spark-worker_2 | 2018-12-27 16:24:10 INFO Worker:54 - Asked to launch executor app-20181227162410-0005/1 for MyFirstScalaSpark

This shows our application being registered with the Master and given an ID. Executors are then launched on each Worker and then a bunch of other stuff happens while our application is running.

We’ve successfully built our first Scala based Spark application and run it on the cluster we built in Part 1. Congratulations!

Just One Other Thing…

There’s a slight gotcha right now: This only currently works because we got lucky and chose a file that is available to ALL containers that use the same image we built in Part 1 (Master, Worker & Driver). If we want to be able to access a file that isn’t bundled in the image, e.g. something on the host filesystem, we need to share the filesystem with the Spark Workers. This is easily achieved by mounting the volume in docker-compose when we bring up the cluster. Open the docker-compose.yml in your editor and add the following YAML at the end of the Worker service declaration:

    volumes:
- "./:/local"

Save the file and bring the cluster back up. Now we have a shared directory across our Spark Workers. Next, we need to share that same directory with the Driver (the one we submit our application from). This is only really for convenience, so we can use bash autocompletion to build the file path we pass as an argument to our applicaiton. We can do that by updating our run command to include the new volume (assuming the directory you ran docker-compose up from is at the same level as your project directory):

docker run --rm -it -e SPARK_MASTER="spark://spark-master:7077" \
-v `pwd`:/project -v `pwd`/../docker-spark:/local \
--network docker-spark_spark-network -w /project \
ls12styler/spark:latest /bin/bash

This container now has both our project directory and the shared data directory, accessible at /project and /local respectively.

Reading a CSV File in Spark

In the first iteration of our application, we used the read.textFile fucntion in Spark to load a README that’s available to the Workers already. For this next one, we’re going to use read.csv, which will load a CSV file in a way we can perform operations on. I’m going to use the UK Space Agency spending report: October 2018 data for the rest of this post, which I will be putting into the directory I’ve mounted under /local on the containers. To start with, we’ll simply use the count method to see how many lines are in the file. We’ll also pass the file path of the CSV file into the application via the command line arguments passed to the jar.

In your editor, open the MyFirstScalaSpark.scala file and add the following code:

package com.example
import org.apache.spark.sql.SparkSession
object MyFirstScalaSpark {
def main(args: Array[String]) {
val spark = SparkSession.builder
.appName("MyFirstScalaSpark")
.getOrCreate()
val filePath = args(0)
val data = spark.read
.csv(filePath)
println(data.count)
spark.stop()
}
}

We’re really only adding the use of the arguments to specify the file path and using that as the file to open with Spark. Spark will then load the file into a DataFrame. We then print the number of rows in the DataFrame to the console. Run package in the SBT console again to build a new version of our application. Submit the built application, this time passing in a file path to the dataset we’re using:

spark-submit --master $SPARK_MASTER \
--class com.example.MyFirstScalaSpark \
target/scala-2.11/myfirstscalaspark_2.11-1.0.jar \
/local/UKSA_Oct_18_-_Transparency_Data.csv

Amidst the log output from Spark, you should see the number 689. A quick check on the command line shows the same:

bash-4.4# wc -l /local/UKSA_Oct_18_-_Transparency_Data.csv
689 /local/UKSA_Oct_18_-_Transparency_Data.csv

Getting the Right Structure

However, the number of lines may not be the case if we wanted a somewhat “table” like representation of the data. Checking the first line of the CSV file shows the file contains column headers so we only really have 688 rows of actual data. Let’s take a look at the actual structure of the data we’ve loaded. In our code, we can add data.printSchema to do this:

    ...
val data = spark.read
.csv(filePath)
data.printSchema
...

package and submit to see output something like the below:

root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)

This doesn’t tell us very much. There are 11 columns, all of the type string and the columns aren’t named very well. When reading CSV files using Spark, we can specify an option that will use the first line as the column headings and then use the remainder as rows for the “table”. We add this to our application by adding to the read line:

    val data = spark.read
.option("header", true)
.csv(filePath)

Run package to build a new jar and then submit it to the cluster again. We should now see the number 688 printed in the logs and the below schema. You can see that we now have named columns instead of them being named by position:

root
|-- Department: string (nullable = true)
|-- Entity: string (nullable = true)
|-- Date of Payment: string (nullable = true)
|-- Expense Type: string (nullable = true)
|-- Expense Area: string (nullable = true)
|-- Supplier: string (nullable = true)
|-- Transaction Number: string (nullable = true)
|-- Amount: string (nullable = true)
|-- Description: string (nullable = true)
|-- Supplier Post Code: string (nullable = true)
|-- Supplier Type: string (nullable = true)

So we’ve now got the right number of rows and a structure that has named columns — Nice! Take a closer look at the schema — all our values are of the type string. Again, this isn’t very helpful. We should be working with proper types. Luckily, Spark comes with another option to try and best guess the schema of the file: inferSchema. We can add this to our code just as we did for the first option:

    val data = spark.read
.option("header", true)
.option("inferSchema", true)
.csv(filePath)

package and submit and you’ll get a slightly improved version of the schema:

root
|-- Department: string (nullable = true)
|-- Entity: string (nullable = true)
|-- Date of Payment: string (nullable = true)
|-- Expense Type: string (nullable = true)
|-- Expense Area: string (nullable = true)
|-- Supplier: string (nullable = true)
|-- Transaction Number: integer (nullable = true)
|-- Amount: double (nullable = true)
|-- Description: string (nullable = true)
|-- Supplier Post Code: string (nullable = true)
|-- Supplier Type: string (nullable = true)

More notably, the Transaction Number and Amount fields are now integer and double types respectively. Everything else remains as a string, even the Date of Payment column. Let’s be pedantic and get that to be of a timestamp type. Yet again, Spark comes to the rescue! We can add another option to detail the format of the column that contains the date:

 val data = spark.read
 .option("header", true)
 .option("inferSchema", true)
 .option("timestampFormat", "dd/MM/yyyy")
 .csv(filePath)

And the resulting schema should resemble the below:

root
|-- Department: string (nullable = true)
|-- Entity: string (nullable = true)
|-- Date of Payment: timestamp (nullable = true)
|-- Expense Type: string (nullable = true)
|-- Expense Area: string (nullable = true)
|-- Supplier: string (nullable = true)
|-- Transaction Number: integer (nullable = true)
|-- Amount: double (nullable = true)
|-- Description: string (nullable = true)
|-- Supplier Post Code: string (nullable = true)
|-- Supplier Type: string (nullable = true)

Notice the Date of Payment column is now of the timestamp type! We now have a table-like representation of our data that contains properly typed columns. Let’s take a look at our data! After the data.printSchema line, insert the following:

    ...
data.printSchema
data.show
...

Package and submit the application and in the output logs, we’ll see the first 20 rows displayed:

+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+----------+-----------+------------------+-------------+
| Department| Entity| Date of Payment| Expense Type| Expense Area| Supplier|Transaction Number| Amount|Description|Supplier Post Code|Supplier Type|
+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+----------+-----------+------------------+-------------+
|Department for Bu...|UK Space Agency|2018-10-11 00:00:00|R & D Current Gra...|UK Space Agency -...|SCIENCE AND TECHN...| 241835| 38745.0| null| SN2 1SZ| WGA ONLY|
|Department for Bu...|UK Space Agency|2018-10-17 00:00:00|R & D Printing an...|UK Space Agency -...|ENGINEERING AND P...| 250485| 2256.94| null| SN2 1FF| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-23 00:00:00|R & D Other Staff...|UK Space Agency -...|A.M. HEALTH & SAF...| 253816| 37.5| null| SN5 6BD| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-17 00:00:00|R & D Other Profe...|UK Space Agency -...| QINETIQ LTD| 254217| 33320.0| null| GU14 0LX| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-17 00:00:00|R & D Other Profe...|UK Space Agency -...| QINETIQ LTD| 254318| -5000.0| null| GU14 0LX| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-17 00:00:00|R & D Other Profe...|UK Space Agency -...| QINETIQ LTD| 254318| 10000.0| null| GU14 0LX| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00|R & D Other Profe...| UKSA - UKSA|Border Consulting...| 255630| 571.06| null| GU51 3BE| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00|R & D Other Profe...| UKSA - UKSA|Border Consulting...| 255630| 16289.65| null| GU51 3BE| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00|R & D Other Profe...| UKSA - UKSA|Border Consulting...| 255630| 16289.65| null| GU51 3BE| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-02 00:00:00|R & D Subsistence...| UKSA - UKSA|Personal Expense,...| 256572| 2.42| null| null| EMPLOYEE|
|Department for Bu...|UK Space Agency|2018-10-02 00:00:00|R & D Subsistence...| UKSA - UKSA|Personal Expense,...| 256572| 5.1| null| null| EMPLOYEE|
|Department for Bu...|UK Space Agency|2018-10-02 00:00:00|R & D Rail Travel...| UKSA - UKSA|Personal Expense,...| 256572| 35.4| null| null| EMPLOYEE|
|Department for Bu...|UK Space Agency|2018-10-02 00:00:00|R & D Subsistence...| UKSA - UKSA|Personal Expense,...| 256572| 39.97| null| null| EMPLOYEE|
|Department for Bu...|UK Space Agency|2018-10-26 00:00:00|R & D Other Profe...|UK Space Agency -...| Cabinet Office| 256868| -3570.0| null| FY5 3FW| WGA ONLY|
|Department for Bu...|UK Space Agency|2018-10-26 00:00:00|R & D Other Profe...|UK Space Agency -...| Cabinet Office| 256868| 1230.0| null| FY5 3FW| WGA ONLY|
|Department for Bu...|UK Space Agency|2018-10-26 00:00:00|R & D Other Profe...|UK Space Agency -...| Cabinet Office| 256868| 5910.0| null| FY5 3FW| WGA ONLY|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00|R & D Current Gra...|UK Space Agency -...|CENTRE NATIONAL D...| 256966| 127900.0| null| 31401| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00| R & D Contractors| UKSA - UKSA|Alexander Mann So...| 257048| 3435.24| null| EC2N 3AQ| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-01 00:00:00| R & D Contractors|UK Space Agency -...|Alexander Mann So...| 257065| 2532.19| null| EC2N 3AQ| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-02 00:00:00|R & D Current Gra...|UK Space Agency -...|SCIENCE AND TECHN...| 257213|2498072.42| null| SN2 1SZ| WGA ONLY|
+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+----------+-----------+------------------+-------------+
only showing top 20 rows

We’ve got a somewhat wide table (sorry for the formatting!), but you can hopefully see the some of the values we’re going to be working with.

Doing Something With Our Data

Now that we have a real table-like structure for our data, we can start to perform our analysis. To begin with, let’s simply order the data by the Date of Payment column, in descending order. As we’re operating on a DataFrame, Spark makes lots of functions available to be able to perform such operations. We can simply use the orderBy function to do what we want. We’ll also use the desc function, passing to it the name of the column we want to sort by. Any operation on a DataFrame in Spark returns a new DataFrame, so we’ll assign the returned DataFrame to orderedData and then display it. We’ll also limit the number of rows to output to 5, just to keep the output minimised.

    ...
data.show

val orderedData = data.orderBy(desc("Date of Payment"))
orderedData.show(5)

...

package and submit, and we should see the following output:

+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+--------+-----------+------------------+-------------+
| Department| Entity| Date of Payment| Expense Type| Expense Area| Supplier|Transaction Number| Amount|Description|Supplier Post Code|Supplier Type|
+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+--------+-----------+------------------+-------------+
|Department for Bu...|UK Space Agency|2018-10-31 00:00:00|R & D Office Supp...|UK Space Agency -...| OFFICE DEPOT UK LTD| 261459| 145.57| null| SP10 4JZ| VENDOR|
|Department for Bu...|UK Space Agency|2018-10-31 00:00:00| R & D Other Travel|UK Space Agency -...|Personal Expense,...| 261508| 6.14| null| null| EMPLOYEE|
|Department for Bu...|UK Space Agency|2018-10-31 00:00:00|R & D Other Profe...|UK Space Agency -...|Geospatial Insigh...| 261474| 13475.0| null| B46 3AD| GRANT|
|Department for Bu...|UK Space Agency|2018-10-31 00:00:00|R & D Current Gra...|UK Space Agency -...|REACTION ENGINES LTD| 261327|167000.0| null| OX14 3DB| GRANT|
|Department for Bu...|UK Space Agency|2018-10-31 00:00:00|R & D Hotel & Acc...| UKSA - UKSA|Personal Expense,...| 261505| 114.0| null| null| EMPLOYEE|
+--------------------+---------------+-------------------+--------------------+--------------------+--------------------+------------------+--------+-----------+------------------+-------------+
only showing top 5 rows

Congratulations! You’ve been able to load a CSV file using Apache Spark and learned how to perform a basic sort using the orderBy method with desc.

And we’re done for this edition! Come back next time when we’ll be looking into using Spark to answer some more business-like questions based on the data we have.

--

--

Interests include, but not limited to: Google Cloud (Certified Arch, Data Eng & DevOps), Programming, Squash, Technology, Space, Food. Views are my own.