TECHNICAL TIPS

Some issues when building an AWS data lake using Spark and how to deal with these issues

A lengthy post for ALL who struggle with Spark and Data Lake

Tran Nguyen
Towards Data Science
13 min readAug 6, 2020

--

Introduction

At first, it seemed to be quite easy to write down and run a Spark application. If you are experienced with data frame manipulation using pandas, NumPy and other packages in Python, and/or the SQL language, creating an ETL pipeline for our data using Spark is quite similar, even much easier than I thought. And compared to other databases (such as Postgres, Cassandra, AWS DWH on Redshift), creating a Data Lake database using Spark appears to be a carefree project.

But then, when you deployed Spark application on the cloud service AWS with your full dataset, the application started to slow down and fail. Your application ran forever, you even didn’t know if it was running or not when observing the AWS EMR console. You might not know where it was failed: It was difficult to debug. The Spark application behaved differently between the local mode and stand-alone mode, between the test set — a small portion of the dataset — and full dataset. The list of problems went on and on. You felt frustrated. Really, you realized that you knew nothing about Spark. Well, optimistically, then it was indeed a very good opportunity to learn more about Spark. Running into issues is the normal thing in programming anyway. But, how to solve problems quickly? Where to start?

After struggling with creating a Data Lake database using Spark, I feel the urge to share what I have encountered and how I solved these issues. I hope it is helpful for some of you. And please, correct me if I am wrong. I am still a newbie in Spark anyway. Now, let’s dive in!

Cautions

1. This article assumes that you already have some working knowledge of Spark, especially PySpark, command line environment, Jupyter notebook and AWS. For more about Spark, please read the reference here.

2. This is your responsibility for monitoring usage charges on the AWS account you use. Remember to terminate the cluster and other related resources each time you finish working. The EMR cluster is costly.

3. This is one of the assessing projects for the Data Engineering nanodegree on Udacity. So to respect the Udacity Honor Code, I would not include the full notebook with the workflow to explore and build the ETL pipeline for the project. Part of the Jupyter notebook version of this tutorial, together with other tutorials on Spark and many more data science tutorials could be found on my github.

Reference

  • Some of the materials are from the Data Engineering nanodegree program on Udacity.
  • Some ideas and issues were collected from the Knowledge — Udacity Q&A Platform and the Student Hub — Udacity chat platform. Thank you all for your dedication and great contribution to us.

Project Introduction

Project Goal

Sparkify is a startup company working on a music streaming app. Through the app, Sparkify has collected information about user activity and songs, which is stored as a directory of JSON logs (log-data - user activity) and a directory of JSON metadata files (song_data - song information). These data reside in a public S3 bucket on AWS.

In order to improve the business growth, Sparkify wants to move their processes and data onto the data lake on the cloud.

This project would be a workflow to explore and build an ETL (Extract — Transform — Load) pipeline that:

  • Extracts data from S3
  • Processes data into analytics tables using Spark on an AWS cluster
  • Loads the data back into S3 as a set of dimensional and fact tables for the Sparkify analytics team to continue finding insights in what songs their users are listening to.

Below is the sample from JSON log file and JSON song file:

Sample of the log_data json file
Sample of the song_data json file

The dimension and fact tables for this database were designed as followed:
Fields in bold: partition keys.

(ERD diagram was made using https://dbdiagram.io/)

Project Workflow

This is my workflow for the project. An experienced data engineer might skip many of these steps, but for me, I would rather go slowly and learn more:

  • Build the ETL process step-by-step using Jupyter notebook on sample data in the local directory; write output to the local directory.
  • Validate the ETL Process using the sub-dataset on AWS S3; write output to AWS S3.
  • Put all the codes together to build the script etl.py and run on Spark local mode, testing both the local data and a subset of data on s3//udacity-den. The output result from the task could be tested using a Jupyter notebook test_data_lake.ipynb.
  • Build and launch an EMR cluster. As what I know, you could submit the project on Udacity without using EMR, but I highly recommend you to run it on the Spark stand-alone mode on AWS to see how it works. You definitely will learn a lot more.
  • Submit a Spark job for etl.py on EMR cluster, using a subset of data on s3//udacity-den.
  • Finally, submit a Spark job for etl.py on EMR cluster, using a full dataset on s3//udacity-den.
  • Try to optimize the Spark performance using various options.
  • Provide example queries and results for song play analysis. This part was described in another Jupyter notebook called sparkifydb_data_lake_demo.ipynb.

The validation and demo part could be found on my Github. Other script file etl.py and my detailed sparkifydb_data_lake_etl.ipynb are not available in respect of the Udacity Honor Code.

Some Tips and Issues in The Project

Tip 1 — Build the ETL process incrementally in Jupyter notebook before building the ETL pipeline to process a whole dataset with scripts.

  • Jupyter notebook is a great environment for exploratory data analysis (EDA), testing things out and promptly validating the results. Since debugging and optimizing the Spark application is quite challenging, it is highly recommended to build the ETL process step by step before putting all the codes together. You will see how the advantage is when we come to other tips.
  • Another important reason for using Jupyter notebook: It is impractical to create etl.py script and then try to debug it since you would have to create a spark session each time you run etl.py file. With the notebook, the spark session is always available.

Tip 2— Carefully explore the dataset. If the dataset is “big”, start the project with a small subset.

In order to work on the project, first, we need to know the overview of the dataset, such as the number of files, the number of lines in each file, the total size of the dataset, the structure of the file, etc. It is especially crucial if we work on the cloud, where requests could cost so much time and money.

To do that, we can use boto3, the Amazon Web Services (AWS) SDK for Python. boto3 allows us to access AWS via an IAM user. The detail on how to create an IAM user can be found here, Step 2: Create an IAM user.

Below is the way to set up the client for S3 on Jupyter notebook:

The key and access key obtained from an IAM user could be saved to the file credentials.cfg at the local directory as below. Note that you may run into “configure file parsing error” if you put your key and secrete key inside “ ” or ‘ ’, or if the file does not have the header such as [AWS]

The content of the credentials.cfg file. Note that you may run into “configure file parsing error” if you put your key and secrete key inside “ “ or ‘ ‘.

With this client for S3 created by boto3, we can access the dataset for the project and look at the file structures of log-data and song_data:

The outputs of the exploration process are:

The dataset is not big, ~3.6MB. However, the song_data has ~15,000 files. It is better to use a subset of song_data, such as ‘song_data/A/A/A/’ or ‘song_data/A/’ for exploring/creating/debugging the ETL pipeline first.

Tip 3— Include defined schema when reading files to a data frame in Spark

My ETL pipeline worked very well on the subset of the data. However, when I run it on the whole dataset, the Spark application kept freezing without any error notice. I had to reduce/increase the sub dataset to actually see the error and fix the problem, for example changing from ‘song_data/A/A/A’ to ‘song_data/A/’ and vice versa. So what is the problem here?

  • It turned out that on this specific data, on the small dataset, my Spark application could automatically figure out the schema. But it could not on a bigger dataset, perhaps due to inconsistency among the files and/or incompatible data types.
  • Moreover, the loading would take less time with a defined schema.

How to design a correct schema:

  • You can manually create a schema by looking at the structure of the log_data JSON files and the song_data JSON files. For simple visualization, I generated the view using the pandas data frame as below
Sample of the log_data json file
Sample of the song_data JSON file
  • For me, the trick is letting Spark read and figure out the schema on its own by reading the small subset of files into a data frame, and then use it to create the right schema. With that, we don’t need to guess any kind of datatypes, whether it is STRING or DOUBLE or LONG, etc. The demonstration for this trick is as followed:

Tip 4— Print out the task and record the time of each task

Although it is the best practice in programming, we sometimes forget to do that. For a big dataset, observing the time for each task is very important for debugging and optimizing the Spark application.

With recoding the time, we know that it takes around 9 mins to read all the .json files from song_data on S3 to the spark data frame using Spark on local mode

Unless you turn off INFO logging in Spark, it is very difficult, if not impossible, to know the progress of the Spark application on the terminal, which is overwhelming with INFO logging. By printing out the task name and recording the time, everything is better:

Printing out the task name and recording the time helps us to keep track of the progress of the application

Tip 5 — What is the best way to import and use a function in pyspark package?

There are at least 2 ways to import and use a function, for example:

  • from pyspark.sql.functions import max
  • or import pyspark.sql.functions as F and then use F.max

Either is fine. I prefer the second approach since I don’t need to list all the functions on the top of my script etl.py.

Notice that themax function is an exception since it is also a built-in max function in Python. To use max function from the pyspark.sql.functions module, you have to use F.max or using an alias, such asfrom pyspark.sql.functions import max as max_

Tip 6 — What is wrong when my Spark application freezes?

There could be many problems with it. I got some myself:

  1. The difference in the AWS region: Please make sure to use us-west-2 when setting up boto3/EMR cluster/S3 output bucket, etc. since the available dataset is on that AWS region.
  2. Didn’t include defined schema when reading files to a data frame: Fix using Tip 3.
  3. It takes such a long time to run the ETL pipeline on the whole dataset: The project is quite impractical because reading and writing to S3 from EMR/Spark are extremely slow. When running the ETL pipeline on a small sub dataset, you can see the same pattern of INFO logging repeats again and again on the terminal, such as the one below:

This is on the “INFO ContextCleaner: Cleaned accumulator xxx” where I found my Spark application appeared to be freezing again and again. It’s expected to be a long-running job, which took me ~115 min to write only the songs table into the s3 bucket. So if you are sure that your end-to-end process works perfectly, then be patient for 2 hours to see how it works. The process could be speeded up, check out on Tip 9 below.

4. Checking the running time on AWS EMR console: You can see how long your Spark application ran when choosing the Application user interfaces tab on your cluster on the EMR console. The list of application can be found at the end of the page:

My ETL pipeline on the whole dataset took ~2.1 hrs to finished on the EMR cluster (1 Master node and 2 Core nodes of type m5.xlarge).

Tip 7 — Auto-increment for songplays_id using Spark— It is not a trivial issue.

This issue is trivial in other databases: In Postgres, we can use SERIAL to auto-increment a column, such as songplays_id SERIAL PRIMARY KEY. In AWS Redshift, we can use IDENTITY(seed, step).

It is not trivial to perform auto-increment for the table using Spark, at least when you try to understand it deeply and in consideration of Spark performance. Here is one of the good references to understand auto-increment in Spark.

There are 3 methods for this task:

  • Using row_number() function using SparkSQL
  • Using rdds to create indexes and then convert it back to the data frame using the rdd.zipWithIndex() function
  • Using the monotonically_increasing_id()

I prefer the rdd.zipWithIndex() function:

Step 1: From the songplays_table dataframe, use the rdd interface to create indexes with zipWithIndex(). The result is a list of rows, each row contains 2 elements: (i) all the columns from the old data frame zipped into a “row”, and (ii) the auto-increment indexes:

Step 2: Return it back to dataframe — we need to write a lambda function for it.

Tip 8— Talking about time, how long does it take to load and write each table?

Below is the time for running the Spark application on AWS EMR cluster, reading from and writing to S3:

My EMR cluster had 1 Master node and 2 Core nodes of type m5.xlarge, as shown below:

aws emr create-cluster --name test-emr-cluster --use-default-roles --release-label emr-5.28.0 --instance-count 3 --instance-type m5.xlarge --applications Name=JupyterHub Name=Spark Name=Hadoop --ec2-attributes KeyName=emr-cluster  --log-uri s3://s3-for-emr-cluster/

Tip 9 —How to speed up the ETL pipeline?

We definitely love to optimize the Spark application since reading and writing into S3 take a long time. Here are some optimizations that I have tried:

Set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2

You can read in detail about it here. It can be done simply by adding spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2") into the spark session.

With this optimization, the total ETL time reduced dramatically from ~2.1hr to only 30 min.

Use HDFS to speed up the process

- “On a per-node basis, HDFS can yield 6X higher read throughput than S3”. So we can save the analytics tables to HDFS, then copy from HDFS to S3. We could use s3-dist-cp to copy from HDFS to S3.

Tip 10 — How is the output? How do the analytics tables turn out on S3?

This ETL pipeline is a long-running job, in which the task of writing the song table took most of the time. The songs table was partitioned by “year” and “artist”, which could produce skew data since some early years (1961 to 199x) don’t contain many songs comparing to the years 200x.

The data quality checks to make sure if the ETL pipeline successfully added all the records to the tables, together with some example queries and results for song play analysis could be found in my notebook on Github.

Tip 11 — Don’t let AWS Billing Dashboard confuse you

Although I have used AWS “quite a lot” and already reached the Free Tier usage limit with this account, whenever I came to the Billing Dashboard, the total amount due is 0.

Don’t let AWS Billing Dashboard confuse you. What it shows is the total balance, not your AWS expense. It is the balance which — according to Wikipedia — is the difference between the sum of debit entries and the sum of credit entries entered into an account during a financial period.”

I thought when looking at the AWS Billing Dashboard, I would see the amount I had spent so far, my AWS expense. But no. Even when click on the Bill Details, everything is 0. And so I thought that I didn’t use AWS that much. My promo credit was still safe.

Only when one day, I click on the Expand All button and I were in big surprise realizing my promo credit had almost gone!!! So again, what you see on the Dashboard is the balance, not the expense. Be careful when using your EMR and EC clusters. It may cost you more money than you thought. (Well, although I admit that gaining AWS experience is so worth it).

Thank you so much for reading this lengthy post. I do aware that people get discouraged easily with long posts, but I want to have a consolidated report for you. Good luck with your project, and I am more than happy for any discussion.

The Jupyter notebook version of this post, together with other tutorials on Spark and many more data science tutorials could be found on my Github.

--

--