End-to-end Distributed ML using AWS EMR, Apache Spark (Pyspark) and MongoDB Tutorial with MillionSongs Data

Kerem Turgutlu
Towards Data Science
10 min readJan 19, 2018

--

In this post I will mention how to run ML algorithms in a distributed manner using Python Spark API pyspark. We will also learn about how to set up an AWS EMR instance for running our applications on the cloud, setting up a MongoDB server as a NoSQL database in order to store unstructured data (such as JSON, XML) and how to do data processing/analysis fast by employing pyspark capabilities.

Requirements:

  • AWS account (or you can run on local)
  • Python 2.7 (for running the scripts that will be provided)

1. Setting up an EMR instance

What is Amazon EMR ? (url: https://aws.amazon.com/emr/)

Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data across dynamically scalable Amazon EC2 instances. You can also run other popular distributed frameworks such as Apache Spark, HBase, Presto, and Flink in Amazon EMR, and interact with data in other AWS data stores such as Amazon S3 and Amazon DynamoDB.

Even though this step will be optional in order to run the scripts and understanding the outline of what’s going on from top the bottom, usually the main reason why we might need distributed computing is the fact that our local machine specs are not capable of doing the processing, analysis or modeling we want.

We might as well launch multiple EC2 instances and configure master and worker nodes but all of these steps are actually taken care of EMR. EMR in short will allow us to automatically distribute jobs we want to run once we run them on our master instance. Otherwise we should be configuring and installing Spark on all nodes.

Below you can see how jobs are distributed through Spark framework. Driver runs the main process converting it into tasks and schedules them for executors. Then workers (executors) run these tasks.

Driver and Executor Architecture in Spark

For launching our first EMR instance we need to login to aws.amazon.com and then console will appear.

select “EMR” here
click “Create Cluster”
Here we should select “Spark” under Applications and define how many instances we want for our application. For example 1 for master and 2 for core nodes (executors which will run tasks). So here we will be distributing the job into 2 worker nodes.

You should also give a key-pair in order to ssh into nodes later and for running your application. Once you click create your cluster it will start bootstrapping and once everything is ready master and cores will be waiting. One key point about EMR is that it is not stoppable once you start it. Meaning you can’t stop and start EMR instance like regular EC2 instances. So it’s good practice to run all the applications and then terminate the EMR. If you try to stop without jobs get done you will need to start over again. Another common practice for data processing or analysis jobs is to use Amazon S3. EMR, S3, Spark get along very well together. You can store your data in S3, then read and process it without actually storing it in your nodes and after processing it through spark you can write it back to S3 and terminate EMR.

After EMR instance is ready you can go to your terminal and ssh into it by using your pem key and public DNS of the master.

$ ssh -i .ssh/mykey.pem hadoop@ec2–xx–xxx–xxx-xx.us-west-2.compute.amazonaws.com
EMR terminal

2. Getting the Data

In this tutorial I will be working with a subset of MillionSongs dataset but you any dataset over couple of GB would be support our point. Here is the link to get the data in many ways: https://labrosa.ee.columbia.edu/millionsong/pages/getting-dataset. Even though they provide this dataset in an EC2 image, for completeness we will pretend that this is our own data that we try to import into AWS. Data we will be exploring here is called MillionSongSubset which has randomly selected subset of million songs, a collection of 10,000 songs. You can refer to “MILLION SONG SUBSET” link and download the tar.gz file from there. You can download the data to your local then scp to both master and worker nodes but this will take forever to scp. So I would recommend here to use a curlwget extension through Chrome and download tar.gz file directly to master and worker nodes. The extension that I am using on Chrome is called CurlWget. Try to store data /mnt/ in your nodes since it may take a quite big space and don’t forget to store it under the same path for each node.

Once you’ve downloaded data you will see there are two main folders data/A/ and data/B/. These folders have sub directories which eventually have single song data under each .h5 hierarchical data file, more information about data is provided in getting-the-dataset url.

3. Converting H5 into Spark RDD with Pyspark

So far we’ve launched our EMR instance and get the data into same path for all nodes, now we will convert data into Spark RDD in order to use pyspark and it’s distributed computing functionalities. RDD (Resilient Distributed Dataset) is the way that spark represents data and stores it in partitions. Data can come from either text, csv, json format or a database.

I couldn’t find a direct way to convert H5 files into an RDD but this link https://www.hdfgroup.org/2015/03/from-hdf5-datasets-to-apache-spark-rdds/ inspired me to write my own script to do this conversion in parallel using pyspark. The basic idea is to collect all h5 filepahts in a python list and create an RDD out of this list. Then we can easily do the conversion we want using map function on this RDD.

# 8 is number of partitions to distribute the data
# For local applications it will be distributed to 8 threads
file_paths = sc.parallelize(h5_file_paths, 8)# Read and convert to python dict in parallelsongs_rdd = file_paths.map(lambda x: h5todict(x))
songs = songs_rdd.collect()

Here we also collect songs as a python list in order to create our MongoDB database, since a part of the tutorial is to show MongoDB as well but we might just stick with our RDD which contains python dicts (in this case each element is a python dictionary which has a single song data in a hierarchical relationship). Having RDD is enough to continue processing, analysis and modeling via pyspark.

Side note: The full script for conversion is provided here:https://github.com/KeremTurgutlu/distcomputing/blob/master/MillionSongs-Project/h5toMongo.py. All nodes should have the required python packages installed such as h5py, json, numpy…

For this you need to ssh into your worker nodes as well as your master node. To ssh into worker nodes, you should have your pem key in master and ssh with that through master terminal. After that all you need is pip install the required packages which are not already provided.

4. Setting up MongoDB

In this step we will install and setup mongodb in our master node. You can follow the steps here https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-hat/ for installation. After installation you should create a directory for the databases you create to be stored in. Again, creating them under /mnt as /mnt/mongo/db is better for disk usage. Also you will need to set permission to this directory, by running the command:

$ sudo chown -R `id -un` /mnt/mongo/db/

After this step run:

$ mongod --dbpath /mnt/mongo/db

Now MongoDB server should be listening to connections. On a separate terminal you can run mongo and start the mongo shell. In mongo shell you may view current databases with show dbs.

Now, having an alive connection to MongoDB. Next, we need to install pymongo package which is the python API for MongoDB into all nodes. After that we may run the script provided through github repo on our master. For that script to run we should also have https://github.com/KeremTurgutlu/distcomputing/blob/master/MillionSongs-Project/user_definition.py in the same path. In user definition we define executor-driver memories , max result and overhead memory in order not run into memory issues while running our application.

This script will read h5 files and create a mongoDB collections called ‘songs’ under database called ‘MillionSongs’ by default (you can of course specify other names for database and collection in user_definition.py). You may think of database as a regular SQL database, collection as a SQL table and document(dict of single song) as a row in the table here.

5. Distributed Analysis AND ML

At this point we have a MongoDB database named “MillionSongs” and under a collection named “songs” (or the database and collection you’ve specified). For your own data in order to have a database and collection for example from a json file that sits under your current directory, you may run this command from another terminal while mongod running in another terminal:

$ mongoimport --db dbName --collection collectionName --file fileName.json

With mongodb it’s very easy to filter, select, aggregate and to do many other operation by querying the data as we were doing in SQL. In our songs collection each song is represented as a document(a python dict) due to the way we’ve create and inserted them through h5tomongo.py script.

Our first task will be to predict what decade a particular song was released in. For more detailed explanations on data fields you may find this link helpful: https://labrosa.ee.columbia.edu/millionsong/pages/field-list. This task is promoted both in Kaggle and UCI websites. We will be using segments_timbre which is an MFCC-PCA like 2d feature about song’s texture, songs energy, songs danceability and songs year to complete our task.

2d features for 12 segments

First, let’s collect the data we want from mongodb:

We define the fields that we want to be returned by defining a dictionary which have the hierarchical field names (for example year has the analysis -> songs -> year hierarchy in our documents), 1 for the ones that we want to be returned and 0 for the ones we don’t want. By default _id of the document is also returned that’s why we explicitly deselected . You can see that we define a filter inside find function where we create train_year_data and test_year_data. Test data will be the data that has year field missing (which is imputed as 0 in the original data). Our task may be to build a good prediction model with training data in order to further impute the missing years in this test set.

from pymongo import MongoClientclient = MongoClient()
db = client[dbname]
collection = db[collection_name]
fields = {'musicbrainz.songs.year':1, 'analysis.segments_timbre':1, 'analysis.songs.energy':1, 'analysis.songs.danceability':1, 'metadata.songs.title':1, 'metadata.songs.artist_name': 1,
'_id':0}
train_year_data = collection.find({'musicbrainz.songs.year':{'$gte':1920}}, fields)
test_year_data = collection.find({'musicbrainz.songs.year':{'$eq':0}}, fields)

Second, let’s create our RDDs and Dataframes:

Spark SQL supports dataframes which we will be using for feeding into machine learning algorithms, it also has a very convenient support with pandas dataframes. Transition from MongoDB collection to Spark RDD is as easy as:

# create train and test rdds
train_year_rdd = sc.parallelize(list(train_year_data))
test_year_rdd = sc.parallelize(list(test_year_data))

We will create a 1d feature vector out of timbre_segments by taking the covariances over all segments and means of each segments. Adding up to total of 90 features for 12 segments. We will apply two custom functions through map in order to get the features we want to create our Spark SQL dataframes. Because train_year_rdd and test_year_rdd are still in hierarchical python dict format that we can’t use to create dataframes to feed into ML algorihtms. You can find the source code for these steps and further here: https://github.com/KeremTurgutlu/distcomputing/blob/master/MillionSongs-Project/predict_year.py

# create train and test dataframestrain_full_df = train_year_rdd.map(lambda d: getYearFeatures(d)).\
map(lambda d: getDFeatures(d)).toDF()
test_df = test_year_rdd.map(lambda d: getYearFeatures(d)).\
map(lambda d: getDFeatures(d, True)).toDF()

Create a training and validation set:

# split to train and validation dataframes
train_df, val_df = train_full_df.randomSplit([0.7, 0.3])

Create vector assembler:

Vector assembler take a list of desired column names that will be feed into ML algorithm, combine them and get them ready for optimized spark ML.

# define vector assembler and transform dataframes into correct format
# cache train and validation dataframes for fast iterations
va = VectorAssembler(inputCols=x_cols, outputCol='features')
train_va=va.transform(train_df).select('features','decade').cache()
val_va=va.transform(val_df).select('features', 'decade').cache()
test_va=va.transform(test_df).select('features', 'decade').cache()

Pyspark ML package provides a high variety of models, transformers and many options. It’s good to check out the documentation: http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html for personal applications. Almost every distributed ML algorithm is provided through this API but not deep learning. For deeplearning you can checkout H20.ai’s sparkling water: https://www.h2o.ai/sparkling-water/ or simply use a modern GPU, or even multiple with Pytorch’s nn.DataParallel.

Since we have our vector assemblers now we can do actual distributed training. I will show the code to run a logistic regression model but script provided in the repo runs both logistic regression and random forest.

# define model and fit lr = LogisticRegression(featuresCol='features', labelCol='decade',regParam=0.01, maxIter=1000, fitIntercept=True)
lr_model = lr.fit(train_va)

or we can simply do a grid search with cross-validating

# create cross validation
cv = CrossValidator().setEstimator(lr).setEvaluator(evaluator).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(train_va)

We evaluate the model’s by using multi-class F1 score since accuracy wouldn’t be good enough.

For running the scripts provided:

In order to run spark jobs through terminal you need to use spark-submit command. After cloning or getting the scripts you want into a folder and defining the right file_roots in h5toMongo.py and desired memory allocations defined in user_definition you may run:

$ spark-submit h5toMongo.py

and

$ spark-submit predict_year.py

I hope that you will find this brief tutorial helpful, there is a lot to cover and to learn through documentation and exprience. So please let me know if you have any suggestions, recommendations or questions.

Thanks for reading !

--

--