The world’s leading publication for data science, AI, and ML professionals.

How to build a Dask distributed cluster for AutoML pipeline search with TPOT

An in-depth tutorial, guiding you through all the steps required to set up a scalable, automated machine learning model pipeline search…

Hands-on Tutorials

Photo by fabio on Unsplash
Photo by fabio on Unsplash

Were you ever in a spot where you knew you did your best cleaning and wrangling with your dataset but couldn’t pick the best model? Do you have a lot of CPUs lying around but they are in separate hosts? Then this is the guide for you! We will explore Dask, in particular, Dask’s distributed library to not only parallelize our TPOT pipeline searches but also distribute them across different machines.


What is Dask?

Dask is an Open Source parallel computing library that scales the existing Python ecosystem. It integrates well with the common data science stack; NumPy, pandas, and scikit-learn. With Dask, we can natively scale huge computations from laptops to whole clusters. We will be focusing on the latter.

What is Dask Distributed?

Dask.distributed: is a lightweight and open-source library for distributed computing in Python.

Architecture: Dask.distributed is a centrally managed, distributed, dynamic task scheduler. It has three main processes:

  1. dask-scheduler: __ The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.
  2. dask-worker: __ The dask-workers are spread across multiple machines and handle the concurrent requests of several clients.
  3. dask-client: The dask-client is the primary entry point for users of dask.distributed

Steps to set up scheduler and workers:

For this example, we will be using VMware ESXi to create 3 virtual machines in a single host machine. One will be the scheduler that will also run a jupyter notebook and the other two will be the workers. This is just a proof of concept setup. It is not, admittedly, the most efficient way to spin up cluster workers. In spite of that, this will present a more general process that can then be adapted with cloud solutions and/or Dask Kubernetes.

We will start with the scheduler instance. First click on Create/Register VM > Create a new virtual machine > Next

You can select any name but I would recommend assigning an easily recognizable name like cluster-master. For this example, I am using Ubuntu 20.04.1 live server on all machines. It is not mandatory but it is highly recommended to use the same distribution in all machines. The machines will be used to perform computations on large datasets¹. Thus, it is also highly recommended to assign a lot of RAM to each machine, and if not possible to at least set up swap memory. For this example, we will be using 24 Gb of RAM and 8 vCPUs on each machine.

_¹_On this tutorial we will be using Pandas DataFrames. For particurarly large datasets we can use Dask’s DataFrame object. Dask.DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster.

Here is a snapshot of our VM specs. You can change these according to your needs.

Repeat the process for the worker machines. We choose the same specs but that’s not mandatory. After installing the OS on each machine we will install the necessary software through SSH. For small clusters like these, we choose ClusterSSH. ClusterSSH is a Tk/Perl wrapper around standard Linux tools like XTerm and SSH. We will use it to replicate commands to all machines simultaneously. Without further ado, let’s get down to business.

Run the following command to install ClusterSSH in your local machine. In this tutorial, we will be managing the cluster via SSH from our personal computer. We will not configure ClusterSSH. We will just connect to the machines by passing the usernames and IP addresses in the CLI. For bigger or multiple clusters, it is recommended to edit the configuration file located at ~/.csshrc.

sudo apt-get install clusterssh
clusterssh cluster-master-name@cluster-master-ip cluster-worker1-name@cluster-worker1-ip cluster-worker2-name@cluster-worker2-ip

In the above picture, anything you type in the CLUSTERSSH grey window will be replicated across all machines. You can type individual shell commands on each of the windows separately too. We want to replicate the following commands to all machines:

sudo apt-get update
sudo apt-get upgrade -y
wget https://repo.anaconda.com/archive/Anaconda3-2020.07-Linux-x86_64.sh
chmod 755 Anaconda3-2020.07-Linux-x86_64.sh
bash Anaconda3-2020.07-Linux-x86_64.sh
conda init

Then we will have to restart the shell, which is very simple, we can just disconnect and reconnect through ClusterSSH. If you installed Anaconda3 to a different location just export the path and run ./anaconda3/condabin/conda init. After reconnecting run the following commands to install the necessary prerequisites for TPOT and Dask. Do note that we are also installing all the optional dependencies, except PyTorch, for the extra functionalities of TPOT.

conda install -c conda-forge tpot xgboost dask distributed dask-ml scikit-mdr skrebate -y
conda update conda
conda update --all

Steps to set up the distributed cluster:

First, we need to set up the scheduler so we can later connect the rest of the machines. Although all of these machines reside in the same host, just to cover the general case we will not connect them via LAN. Let’s get started! We will use a terminal multiplexer called screen to manage multiple shells in one ssh session. Here is a quick cheat sheet of the options you will need:

screen -S session_name #Starts a new screen named session_name
screen -r #Resumes a screen session
screen -ls #Returns the session IDs of running screens
screen -X -S <id or name> kill #Kills the specified screen
While inside an attached screen: Ctrl+a d #Detaches the current screen

Great! Now we are ready to set up the scheduler inside a screen! Click on the master/scheduler VM instance SSH window and type the following:

screen -S scheduler
dask-scheduler
You should be greeted by this output!
You should be greeted by this output!

The Scheduler IP address will, of course, be different. The scheduler itself is located at the 8786 port and this is where we will be pointing all our workers. The 8787 port hosts the dask dashboard. We will get back to this later after we connect all the workers. For now, press Ctrl+a d to detach the screen. Now let’s set up our jupyter notebook:

screen -S notebook
jupyter notebook --no-browser --port=8888
Save the token! You will need it later.
Save the token! You will need it later.

Save the token, detach the screen and close off the SSH connection. Do not close the SSH connections to the other machines. We now have to forward the port from the remote notebook in our Master VM instance to a local port so that we can access it from our browser. We can achieve this with the following command:

ssh -N -f -L localhost:8003:localhost:8888 remoteuser@remotehost
#Forwards the port 8888 of our remote machine to port 8003 of our local machine. We can now access it in our browser at localhost:8003

Go to your localhost on the port specified(we chose 8003), enter the token you saved, and voila! The notebook is running on the remote machine while we edit it from our local machine.

Now we can proceed to set up all the workers to point to the scheduler. This is pretty simple. We create a screen via the grey ClusterSSH terminal, this time in all the workers, and point them to the scheduler in our master VM instance.

screen -S worker
dask-worker IP-of-the-scheduler:8786 --nprocs 1 --nthreads 8

Detach the screens and you’re all set. The clients are connected, the scheduler is up and running and you also have a dashboard to check everything! Use the nprocs and nthreads parameters to choose the number of processes and threads per worker respectively. This choice depends on the workload. Dask’s main contributor Matthew Rocklin recommends the following:

Using few processes and many threads per process is good if you are doing mostly numeric workloads, such as are common in Numpy, Pandas, and Scikit-Learn code, which is not affected by Python’s Global Interpreter Lock (GIL). However, if you are spending most of your compute time manipulating Pure Python objects like strings or dictionaries then you may want to avoid GIL issues by having more processes with fewer threads each. Using more processes avoids GIL issues, but adds costs due to inter-process communication. You would want to avoid many processes if your computations require a lot of inter-worker communication. source: MRocklin – StackOverflow


What is TPOT?

TPOT is a Python Automated Machine Learning (Automl) tool that optimizes machine learning pipelines using genetic programming. TPOT is built on the scikit-learn library and just like Dask, it uses existing Python APIs and data structures. This means its usage should be pretty intuitive for scikit-learn users. It also integrates quite nicely with Dask!

An example machine learning pipeline - Source: Docs
An example machine learning pipeline – Source: Docs

A quick overview of TPOT:

TPOT uses an evolutionary algorithm to find the best pipeline. But what is a pipeline? In machine learning projects you seldom have the ideal format of data to create a performant model. There are many transformations you can perform such as imputing, feature scaling and normalization or categorical variables encoding— which in and of itself has more than one way of being implemented, i.e. one-hot or target encoding. Then you could create an ensemble by bagging, boosting or stacking models. The final model will use a combination of some or all of the above. This is called a pipeline. TPOT tries out various pipelines, and lets them "randomly mutate", just like living organisms do (or viruses and extrachromosomal DNA if you want to be pedantic about it) until it finds a better performing one. You can read the full TPOT paper here.

An example TPOT pipeline - Source: Docs
An example TPOT pipeline – Source: Docs

Finally, let’s use our cluster to find the best pipeline for a dataset. The following notebook is taken mostly from the docs, with some slight changes.

While TPOT is running, you can check the dashboard at localhost:8787 of your scheduler. If you are not in the remote scheduler, you can check it at IP-of-scheduler:8787 from your local machine. The dashboard provides things like insights on the progress of each generation, a basic system monitor of the workers and the whole cluster, the logs and Call Stacks of each worker, and a very, very cool graph. Here is what the Dask dashboard should look like when running:

Closing off, I would like to mention that the main contributors of Dask founded a company called coiled providing cloud and enterprise solutions for managed Dask clusters and python scaling in general. They are currently in beta and offer up to 100 free CPUs in the cloud to demo their platform.


Related Articles