Source: unsplash.com

Distributed Data Pre-processing using Dask, Amazon ECS and Python (Part 1)

Will Badr
Towards Data Science
7 min readDec 18, 2018

--

The quality and accuracy of machine learning models depend on many factors. One of the most critical factors is pre-processing the dataset before feeding it into the machine learning algorithm that learns from the data. Therefore, it is critical that you feed them the right data for the problem you want to solve. For example, some algorithms do not support null values and should be transformed and processed before training. You should also understand why there are missing values and if there is a specific pattern for those values and what their influence is.

Data Engineers and Data Scientists often use tools from the python ecosystem such as Numpy and Pandas to analyze, transform and visualize their data which are designed to be high performance, intuitive and efficient libraries. Performing such operations on a small dataset in a fast and scalable manner is not challenging as long as the dataset can fit into the memory of a single machine. However, if the dataset is too big and cannot fit into a single machine, Data Engineers may be forced to rewrite their code into more scalable tools such as Spark and SparkML that can be computationally supported by a big EMR cluster.

To overcome this problem, I use Dask. Dask takes advantage of people’s familiarity with famous libraries like Pandas and you can use it to develop code to process data in a scalable, parallel and distributed way.

In this article, I am discussing how to create a serverless cluster to pre-process data in a distributed and parallel manner. I am also using Dask Distributed cluster which provides advanced parallelism for analytics and data processing. To enable performance at scale while optimizing the cost and keeping the solution flexible, I will build a serverless cluster using AWS Fargate which provides no cluster management overhead and one API call to scale the cluster up and down, then integrate the cluster with Amazon SageMaker Jupyter notebook. (Or any other IDE of preference)

What is Dask Distributed?

Dask.distributed: is a lightweight and open source library for distributed computing in Python. It is also a centrally managed, distributed, dynamic task scheduler. Dask has three main components:

dask-scheduler process: coordinates the actions of several workers. The scheduler is asynchronous and event-driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers.

dask-worker processes: Which are spread across multiple machines and the concurrent requests of several clients.

dask-client process: which is is the primary entry point for users of dask.distributed

Solution Diagram:

Solution Diagram and Architecture

I need to keep a low latency communication and simple networking configurations between the Juypter Notebook and the Fargate cluster. Hence, I am creating both the Notebook and the Dask Distributed cluster in the same Virtual Private Cloud (VPC).

Pre-Deployment Steps:

First, I need to create an Elastic Container Registry (ECR) Repository as a pre-requisite. To do this, I go to the AWS Console -> Elastic Container Service (ECS) -> Select Repositories then “Create repository”.

(You can also skip this step and create a github repository)

We need to give it a name — “dask” then click “Next Step”:

Then, we move to the “Get Started” page where we find the initial commands to create and push the container image.

Now that the repository is ready, I will switch to a shell “Terminal” to download, build and tag the container image then push it to ECR. To do that, I am running the following shell commands:

bash# git clone https://github.com/wmlba/ECS-Dask.gitbash# cd ECS-Dask; tar -xzf base-image.tar.gz; cd base-imagebash# `aws ecr get-login --no-include-email --region us-east-1`bash# docker build -t dask .bash# docker tag dask:latest <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latestbash# docker push <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest

NOTE: In order to run the above commands, you need to have Docker installed locally on the machine you run the commands from and the AWS credentials configured.

The docker image I am using is slightly modified from the one published on the dask repository

It will take a few minutes to build the image then push it to the ECR repository that was created in an earlier step. You can verify that by clicking on the repository name in the ECS console.

Deploy the solution

Now that I have the image ready, I need to deploy the solution using a CloudFormation template by following the steps below:

  1. launch this CloudFormation template in your account. It takes approximately 3–5 minutes for the CloudFormation stack to complete

The CloudFormation stack will create resources such as: Fargate Cluster, Task Definitions, Services and Tasks for both Dask worker and Scheduler. It will also create an IAM Execution Role and Policy to allow access to Elastic Container Registry (ECR) repository and CloudWatch log groups for logs. All the resources will be created in the us-east-1 region by default.

2. On the Specify Details page, I specify a private subnet with a NAT gateway for the cluster. The Dask Scheduler and Workers will communication over a private network and the NAT gateway is only needed for the ECS Service to be able to pull the ECR image from the repository. Then, select Next:

3. On the Options page, select Next.

4. On the Review page, review and confirm the settings. Be sure to select the box acknowledging that the template will create AWS Identity and Access Management (IAM) resources with custom names. To deploy the stack, select Create. After a few minutes, the stack creation should be complete.

After the stack is created, you can also confirm that the ECS Dask cluster is deployed running. You can verify this by switching to ECS Console -> Click Clusters -> Click Fargate-Dask-Cluster and on the tasks tab, there should be 2 running tasks:

Now that the Dask Cluster is ready, I will create SageMaker Notebook so I can start using the cluster. To do this, I switch to SageMaker Console -> Notebook Instances -> Create Notebook Instance.

Then I will select the same VPC and Subnets that were selected earlier in the CloudFormation template:

NOTE: You can select any other subnet and security group as long as you enable access between the SageMaker notebook and the Dask Cluster.

Then, I create a new python3 notebook by clicking on New -> conda_python3. Dask packages are installed by default on the SageMaker notebook but it is important to make sure that the package is updated to the latest version. To verify that, I will run the conda update command on the notebook:

NOTE: If the client version is lower than the scheduler and worker version, you will encounter errors when initiating the client.

The next step will be creating the client and connecting to the dask cluster by running the below code:

Notice that I used DNS name of the scheduler that was automatically assigned using ECS Service Discovery Functionality that uses Route 53 auto naming API actions to manage Route 53 DNS entries

Now let’s do some operations on the data using the cluster but before that, I will scale up the number of workers in the cluster to 7 workers. To do this, I run one command in the notebook as below:

After a few seconds, the worker tasks status in Fargate Console will be ‘RUNNING’. I will restart the Dask Client to make sure that we utilize the parallelism nature of the cluster.

Now we have a Cluster of 14 cores of CPU and 12 GB of memory (2 CPU cores and 2 GB of memory for each of the 7 workers). Let’s do some compute and memory intensive operations and generate some insights. I am loading a dask dataframe with the data and computing the trip distance and grouping by the number of passengers.

The results start to show up after about 2.5 minutes after parallelizing the task across 7 different workers and loading more than 10 GB of data in parallel.

Visualization:

Screenshots from Bokeh Server at the Scheduler Task that shows the operations being threaded across the workers. The dashboard can be accessed in the browser from the scheduler IP address and port 8787:

The following screenshot is showing the resources (CPU and Memory) utilization for each worker:

Now you should be ready to do some pre-processing magic!

In part 2, I will show some code on running analytics and pre-processing/Feature Engineering and Machine Learning using the Dask Cluster we created.

Thanks for reading the post. Feedback and constructive criticism is always welcomed. I will read all of your comments.

-Will

--

--