The world’s leading publication for data science, AI, and ML professionals.

An Introduction to Apache, PySpark and Dataframe Transformations

A Comprehensive Guide to Master Big Data Analysis

Picture from Unsplash
Picture from Unsplash

Introduction: The Big Data Problem

Apache arises as a new engine and programming model for data analytics. It’s origin goes back to 2009, and the main reasons why it has gained so much importance in the past recent years are due to changes in enconomic factors that underline computer applications and hardware.

Historically, the power of computers only grew with time. Each year, new processors were able to perform operations faster and the applications that run on top of them automatically got faster.

All of this changed in 2005, when the limits in heat disipation caused the switch from making individual processors faster, to start exploring the parallelization of CPU cores. This meant that applications and the code that run them must be changed too. All of this is what layed out the ground of new models like Apache Spark.

In addition, the cost of sensors and storing units only had decreased on the last years. Nowadays is completely unexpensive to collect and store vast amounts of information.

There is so much data available, that the way to process it and analyze it, must change radically too, by making large parallel computations on clusters of cumputers. These clusters enable the synergic combination of those computers’ power, simultaneously, and make much easier tackling expensive computational tasks like data processing.

And this is where Apache Spark comes into play.

What is Apache Spark

As found on the great book: Spark – The Definitive Guide:

"Apache Spark is a unified computing engine and a set of libraries for parallel data procesing on clusters of computers"

Nowadays, Apache Spark is the most popular open source engine to Big Data processing. And the main reasons are:

  • It supports programming languages as widely used as: Python, Scala, Java and R.
  • It supports SQL tasks.
  • It enables data streaming.
  • It has libraries for Machine Learning and Deep Learning.
  • It can be run in a single machine or in a cluster of computers.

The following is an sketch that illustrates the different libraries available in the Spark ecosystem.

Figure by the Author
Figure by the Author

How to Set Up and Run Apache Spark

Throughout these series of articles, we will focus on Apache Spark Python’s library, PySpark. As stated before, Spark can be run both locally and in a cluster of computers. There are several ways to configure our machines to run Spark locally, but are out of the scope of these articles.

One of the simplest and fastest ways to work with PsyPark and unlock its inmense processing power, is with the free website Databricks, concretely by using its Community Edition.

To get started we shoud simply go to:

Try Databricks

And select its Community Edition:

Then, we must create and account.

Running A Temporal Cluster

Once we have created an account, to be able to start working, we shoud create a temporary cluster.

As it is a free version, these clusters have a default of 6 Gb of RAM and can be run for 6 hours each. In order to develop industrial projects or work with Data Pipelines, it is suggested to use the premiun platform.

But for the aim of these tutorials, the community edition will be more than enough.

Adding Data

In order to add data to work with:

  • Click on the data tab
  • Then add data

You can work both with available data uploaded by other users or with data uploaded from your computer.

Once, done we can create a Table in a Notebook and we are all set up!

Pyspark Applications & Partitions

To understand how Apache Spark works we should talk about the core components of a Spark Application: The Driver, the Executors and the Cluster Manager.

The following is a very illustrative sketch of a Spark Application Architecture:

Figure by the Author
Figure by the Author

Driver

The driver is located in a node of the cluster of computers and performs three main tasks:

  1. Holds information about the Spark Application
  2. Responds to a input, for example a user’s programm
  3. Analyzes, distributes and schedules the tasks to be done by the executers.

Executors

The executors are the ones that actually perform the work assigned by the driver. They do two things:

  1. Executing the code assigned to them.
  2. Reporting the state of the computation to the driver.

Cluster Manager

The cluster manager is the responsible for:

  1. Controlling the physical computers
  2. Distributing resources to Spark Applications

There can be several Spark Applications running on the same cluster, at the same time, and all of them will be managed by the Cluster Manager.

PySpark Dataframes

Apache Spark works with several data abstractions, each with an specific interface to work with. The most common abstractions are:

  • Datasets
  • Dataframes
  • SQL Tables
  • Resilient Distributed Datasets

Throughout these series we will focus on the most common unit to represent and store data in Apache Spark, Dataframes.

Dataframes are data tables with rows and columns, the closest analogy to understand them are spreadsheets with labeled columns.

One important feature of Dataframes is their schema. A Dataframe’s schema is a list with its columns names and the type of data that each column stores.

Other relevant attribute of Dataframes is that they are not located in one simple computer, in fact they can be splitted through hundreds of machines. This is due to optimize the processing of the information and when data is too large to fit a single machine.

Apache Partitions

As stated before, the executors perform the work assigned by the driver, and they do it in a parallel fashion, in order to be able to do this, Spark split data into different partitions.

These partitions are collections of rows located in a single computer within a cluster. When we talk about Dataframe’s partitions we are talking about how the data is distributed across all the machines on our cluster.

Most of the time we will not specify explicitly how the partitions will be done in our clusters, but with our code we will transmit high-level transformations of the data and Spark will realize by itself which is the optimal way to perform these partitions. Always looking for obtaining the maximum processing efficiency.

Low level APIs to perform these operations are out of the scope of these series.

Dataframes Transformations

First of all, we have to understand that transformations are modifications that we specify to do to our dataframes.

These transformations are specified in a high-level fashion and will not be executed until we explicitely call for an action to be made.

This way of working is called lazy evaluation, and the aim is to improve efficiency. When we call for transformations to be made, Spark will desing a plan to perform optimally these tasks, and will not execute it until the very last minute when we call an action (like .show() or .collect())

Apple Stock Price

Now, we will explore some of the most common actions and transformations. We are going to work with Apple stock price’s data, from 2010 to 2016. We will perform some exploratory Data Analysis, data transformations, deal with missing values and perform grouping and aggregating.

Import Dataframe

To initialize and display a datframe, the code will be the following:

# File location and type
file_location = "/FileStore/tables/appl_stock.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) 
  .option("inferSchema", infer_schema) 
  .option("header", first_row_is_header) 
  .option("sep", delimiter) 
  .load(file_location)
# Display Dataframe
display(df)

Get Dataframe’s Schema

The schema of a dataframe is the description of the structure of the data, it is a collection of StructField objects and provides information about the type of the data in a dataframe.

To diplay the Dataframe’s Schema is as simple as:

# Display Dataframe's Schema
df.printSchema()

Perform Filtering and Transformations

To filter our data, to get only those rows that have a closing price smaller that $500, we could run the following line of code:

# Filter data usign pyspark
df.filter(" Close < 500").show())

We can also filter to only obtain certain columns:

# Filter data by columns
df.filter("Close < 500").select(['Open', 'Close']).show()

To filter by one column and showing other, we will use the .select() mehtod.

# Filter by one column and show other
df.filter(df['Close'] < 500).select('Volume').show()

To filter by multiple conditions:

# Filter by multiple conditions: closing price < $200 and opening price > $200
df.filter( (df['Close'] < 200) &amp; (df['Open'] > 200) ).show()

Obtain a Statistic Summary of the Data

Similarly to other libraries likePandas, we can obtain a statistic summary of the Dataframe by simply running the .describe() method.

# Display Statistic Summary
df.describe().show()

Add and Rename Columns

To add a new column to the dataframe, we will use the .withColumn() method as follows.

# Display Dataframe with new column
df.withColumn('Doubled Adj Close', df['Adj Close']*2).select('Adj Close', 'Doubled Adj Close').show()

To rename an existing column, we will use the .withColumnRenamed() method.

# Display Dataframe with renamed column
df.withColumnRenamed('Adj Close', 'Adjusted Close Price').show()

Grouping and Aggregating Data

Now, we will perform some gruping and aggretion of our data, in order to obtain meaningful insights. But first, we should import some libraries

# Import relevant libraries
from pyspark.sql.functions import dayofmonth,hour,dayofyear,weekofyear,month,year,format_number,date_format,mean, date_format, datediff, to_date, lit

Now, let us create a new column, with the year of each row:

# To know th average closing price per year
new_df = df.withColumn('Year', year(df['Date']))
new_df.show()

Now, lets group by this recently created ‘Year’ column and aggreagate by the maximum, minimum and average prices of each year to obtain meaningful insights of the status and evolution of the price.

# Group and aggregate data
new_df.groupBy('Year').agg(f.max('Close').alias('Max Close'), f.min('Close').alias('Min Close'), f.mean('Close').alias('Average Close')).orderBy('Year').show()

We have achieved our goal! However, we still have some very difficult data to read. In fact we have way more decimals than we need.

Taking into account that we are working with prices of hundreds of dollars, more than two decimals do not provide us with relevant information.

So let’s take advantage and learn to format the results to show us the number of decimals we want.

Formating Our Data

To format our data we will use the format_number() function as follows:

# Import relevant functions
from pyspark.sql.functions import forman_number, col
# Select the appropiate columns to format
cols = ['Max Close', 'Min Close', 'Average Close']
# Format the columns
formatted_df = new_df.select('Year', *[format_number(col(col_name), 2).name(col_name) for col_name in cols])

User Defined Functions

Let’s learn now how to apply functions defined by us to our dataframes. We will use it in this example to get a column with the month of the year in which each row was recorded.

# Import relevant functions
from pyspark.sql.functions import date_format, datediff, to_date, lit, UserDefinedFunction, month
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
# Create month list
month_lst = ['January', 'Feburary', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
# Define the function
udf = UserDefinedFunction(lambda x: month_lst[int(x%12) - 1], StringType())
# Add column to df with the number of the month of the year
df = df.withColumn('moy_number', month(df.Date))
# Apply function and generate a column with the name of the month of the year
df = df.withColumn('moy_name', udf("moy_number"))

Success!

Conclusion

Throughout this article we have covered:

  • The basis of Apache Spark
  • We have gained an intuition of why it is important and how it operates
  • Perform analysis operations with PySpark and Dataframes

On the next articles we will learn how to apply Machine Learning in PySpark and apply this knowledge to some projects. Stay tuned!

Final Words

If you liked this post then you can take a look at my other posts on Data Science and Machine Learning here.

If you want to learn more about Machine Learning, Data Science and Artificial Intelligence follow me on Medium, and stay tuned for my next posts!


Related Articles