Creating a Distributed Computer Cluster with Python and Dask

How to set-up a distributed computer cluster on your home network and use it to calculate a large correlation matrix.

M
Towards Data Science

--

Photo by Taylor Vick on Unsplash

Calculating a correlation matrix can very quickly consume a vast amount of computational resources. Fortunately, correlation (and covariance) calculations can be intelligently split into multiple processes and distributed across a number of computers.

In this article, we will use Dask for Python to manage the parallel computation of a large correlation matrix across a number of computers on a Local Area Network.

What is a Correlation Matrix

A correlation matrix shows the linear statistical relationship between two variables. Famously, correlation does not imply causation, but we still regularly make use of it as part of our efforts to understand the datasets we work with.

If you just want to go ahead and compute a correlation matrix on your own home cluster of computers then skip this section, but if you are interested in how to actually calculate correlation mathematically then read on.

One easy way to visualise the steps is to recreate this in Excel, so I will show you in Excel how to calculate the correlation and covariance between three sets of Foreign Exchange data.

First, we take our three sets of time-series data, arrange it by columns for each currency pair, and then take the mean (sum of values, divided by number of values) of each set of FX rates:

Then for each time series data point, we calculate the difference of that point from the mean:

We then square each of those differences which will give us our variance for each of the datasets.

We then calculate the sum of the range of squared values, divided by our sample size minus one, giving us our Variance, 0.0013% for GBPUSD for example as seen in row 3. We subtract one from the count of our data points as this is a sample of our dataset (we do not have every single historical datapoint) rather than the entire population. This is known as the “sample variance”. If we had the entire population we would not subtract one.

We can also calculate the Standard Deviation at this stage, which is simply the square root of the variance:

We can calculate the Covariance between two given datasets by multiplying the two “Difference from mean” values at each point in time. The example below shows the single point-in-time covariance calculation for GBPUSD and JPYUSD:

We have called GBPUSD dataset A, JPYUSD dataset B, and so on. With four individual datasets we then have 6 combinations for our covariance calculation: AB, AC, AD, BC, BD, and CD. All of these are calculated in the same way, by multiplying the differences from the means of each dataset at each point in time.

We then take the mean of these values, in the same way as we did for the variance — subtracting one from the denominator — to get the covariance:

And finally, we can calculate the correlation of these two datasets by dividing the covariance by the product of the two standard deviations:

Mathematically, we can represent this as :

This may look daunting but it is just what we calculated already. Looking at the very right-hand equation, the top of the fraction is saying “take the difference of each data point in dataset X from the mean of dataset X, and multiply that by the equivalent calculation in dataset Y” and then sum all of those values together. The x with the subscript i just means a given point of dataset x, whilst x̅ means the average for the dataset. The same goes for the y-values.

The denominator is saying, again, for every point in X, subtract the mean value of X from that point and then square the value you get. Sum all of those together and do the same for Y. Now multiply those two values together and take the square root of that number. The result is the product of the standard deviations of the two datasets.

This is exactly what we have just calculated for ourselves and it’s exactly what we are doing today, except we are using a home computer cluster to calculate correlations like this across a large number of financial instruments for a significant number of points in time.

Setting up the Environment

The Network

The first step is to establish your computation environment on every computer that will form part of the cluster. You will need to make sure the computers can see each other on the Local Network (through firewall exceptions). You will also need to note the IP address of each computer.

On Windows you can go to a Command Prompt and type:

ipconfig

This will give you your IP address on the Local Network. On a Linux machine you can type:

hostname -I

Or on OSX the below should work:

ipconfig getifaddr en0

You should note each of the IP addresses on each of your machines as you will need this to set-up the cluster. You can also check that each computer can see the others by running the command:

ping <Target IP Address>

If this responds with data then the two computers can see each other, if not you will need to double-check the IP address and ensure that a firewall is not blocked communication.

We are going to use three computers as part of our small cluster, 192.168.1.1, 192.168.2 and 192.168.1.3.

Python and Dask

One very important point to note is that each Python environment must be identical on each computer. If there are inconsistencies in library versions this can cause problems.

If it is easier, use Anaconda or a similar tool to create a new Python environment with matching versions of the libraries you will be using such as Numpy and Pandas.

You will also need to install Dask itself on each of the computers. You can do this using Conda:

conda install dask distributed -c conda-forge

Or you can use Pip:

python -m pip install dask distributed --upgrade

You should now have consistent Python installations, with matching library versions, across all of the computers in your cluster.

Sourcing the Data

In this example, I am using a large dataset of daily equities data. This contains the daily close prices of ~7,000 US equities from 2000 to 2020. This would be extremely time-consuming to compute on any one of the personal computers that form part of my cluster. When I tried the calculation initially the system crashed after a few hours due to hitting a memory limit when writing to the page file.

These data are stored in a large SQL table which we are going to access sequentially for each stock and then merge. This proved significantly faster than performing the manipulations in Python.

We will also be using a different implementation of a MySQL Database Connector. The standard MySQL connector library is relatively slow when dealing with large datasets because it itself is written in Python. The C implementation of this connector is must faster and you can find the installation details here:

Once MySQLdb is installed you can begin to query the data. I am firstly extracting a list of all unique, chronologically-ordered dates from the table. This will be the golden source of each timestep in the overall database.

I am then extracting, stock-by-stock, the time series data, and left-joining this to the golden time series source. I then add this individual dataframe to a list of dataframes, one for each stock that has been extracted. I defer the merge operation to combine each stock time series into a single dataset until later.

This query approach can itself be parallelised, where a single query to extract all data from the table could not be.

This leaves us with a list of datasets with a common time-series index which we can then work with.

Here we are extracting our list of stocks, building our golden source for the time-series, and then iterating through the list extracting the time series data for each stock in this common format.

The query where we extract the stock data:

SELECT date, close as {} FROM history where stock = '{}'

takes the closing price of the stock and names that column with the ticker for that stock. This means that instead of having 7,000 columns each with the name “close” we will have 7,000 columns each named with the relevant ticker code and mapped to a common time-series. This will be much easier to deal with.

Calculating the Correlation Matrix

Now for the good bit. We are going to get the data in its final format and begin to calculate the correlation matrix.

Here we are setting the date index for each of the dataframes in our list and merging them together. Because we named each column after the relevant stock ticker, we now have one large table which has a common list of dates as an index and a labeled column for each stock. We’re going to simplify the table where we can by dropping and column with all “NaN” values — this is where there is no relevant data.

This is the exact format we need to calculate our correlation matrix so we are going to:

import dask.dataframe as dd

and create a Dask dataframe

merged = dd.from_pandas(merged, 20)

This is the time when you will need to make an important design decision that will significantly impact the speed of processing the correlation matrix.

Here we are converting the Pandas dataframe into a Dask dataframe and we also have to specify a number, here “20”, for the number of partitions in the dataset.

Each partition will be treated as an independent unit for the purposes of parallel calculation. If you select “1” for example, the correlation matrix will be calculated by a single thread on a single CPU and you will achieve no parallelisation benefit.

On the other end of the spectrum if you set this number too high your performance will suffer as there is an overhead to loading and processing each task.

Initially, I set this equal to the number of threads across the computers in my mini-cluster. However, I soon found that one of the machines was much slower and the two faster machines were sat idle waiting for this one to complete. To mitigate this I increased the number of processes so that the faster computers could continue to work on later tasks whilst the slower one continued to work on the initially allocated processes.

After making this decision, we convert the merged dataframe to Float 16 from Float 32 datatype as the extra precision is unnecessary and will slow our next calculations.

We then run the critical line:

merged = merged.corr().compute()

There are two functions here, .corr() which calculates our correlation matrix, and .compute() which sends it to Dask to calculate.

After this has processed, we will receive a correlation matrix, showing the correlation of each stock with each other. Below is a simplified illustration of what this could look like (with made-up data as we haven’t calculated the matrix

You will notice there is a lot of repetition and unnecessary data here. The correlation of AAPL with APPL is 1. The correlation of AMZN with AMZN is 1. This is going to be the case across the diagonal of your entire matrix.

Similarly, there is duplication. The correlation of AMZN with AAPL is clearly the same as the correlation of AAPL with AMZN (it’s the same thing!).

There is no point in duplicating all of this data, particularly for such a large dataset, when you come to save this back in your dataset. So let’s get rid of this duplication.

You will notice that the unnecessary data values form half of the dataset, across a diagonal shape. So let’s mask out that diagonal before we save down this dataset.

corrs = merged.mask(np.tril(np.ones(merged.shape)).astype(np.bool))

This line does exactly that. It creates a matrix of ones in the shape of our correlation matrix. Then, it uses Numpy to return a copy of the data with the given diagonal zeroed-out using the tril() function. Then it turns these zeroes into a False condition by casting it to the type bool. Finally it uses Pandas to apply this as a mask to the dataset and saves it as our new correlation matrix.

We can then use convert this simplified matrix into a table which we can then upload back into our database.

levels = merged.columns.nlevels...df1 = corrs.stack(list(range(levels))).reset_index()
del df1["level_0"]
del df1["level_2"]
df1.columns = ["Stock 1", "Stock 2", "Correlation"]
print("Uploading SQL")
df1.to_sql("correlations", con=engine, if_exists='append', chunksize=1000, index=False)
print("Process Complete")

Here we convert this back into a table for our database by using the stack() function. This gives us a table that looks something like this:

You can see there is no duplication of values or redundant 1s showing a stock’s correlation with itself. This can then be uploaded back into the database and the process is complete.

Distributed Computation of the Matrix

There is one final, crucial line we must add to our code but first we need to create our cluster.

On each computer you will need to go to a Python terminal. If you are using Anaconda you can use the GUI to launch a terminal, or you can go to Anaconda Prompt and select your environment using:

conda activate <environment>

You will now need to decide which computer will act as a scheduler and manage the distribution of the tasks and which will act as workers. It is perfectly possible for a computer to act as both scheduler and worker.

On the scheduler, go ahead and enter the following into the Python terminal:

dask-scheduler

This will launch the scheduler and manage the workflow around your cluster. Make a note of the local network IP address that you have done this on, in our case, we will use 192.168.1.1.

We will want this machine to also act as a worker, so we will open another Python terminal and type:

dask-worker 192.168.1.1:8786

This should show that we have established a connection between the worker and the scheduler.

Now we need to repeat this command on each of the other computers in our cluster, with each pointing towards the scheduler hosted at 192.168.1.1.

I mentioned earlier than one of the computers was slower than the others. It did, however, have twice as much RAM which can be handy for assembling the correlation matrix at the end. Instead of removing it from the cluster entirely, I decided to limit the number of processes it could run by restricting the number of threads available to Dask. You can do this by appending the following to your Dask-worker instruction:

dask-worker 192.168.1.1:8786 --nprocs 1--nthreads 1

Now your cluster is up and running, you can add the crucial line to your code. Add the following near the top of your Python file (or in any case, above your call to .corr().calculate():

client = _get_global_client() or Client('192.168.1.1:8786')

Make sure you replace the IP address with that of your scheduler. This will point Dask towards the scheduler to manage the computation.

Now, you can run your program.

Monitoring the Computation

Dask provides an excellent Dashboard for monitoring the progress of your computation.

Go to 192.168.1.1:8787 (or whichever IP address you have established your scheduler on using the port 8787).

This will show you the Dask Dashboard so you can track progress and monitor the utilisation of the cores and memory.

The memory and CPU usage of the cluster

The “Workers” page above shows you the nodes in your cluster and their current levels of utilisation. You can use this to track which are doing the most work and make sure this remains in an acceptable range. It is worth noting the CPU utilisation percentage is based on each core, so you can have 200% CPU utilisation if two cores are used, or 800% for eight cores etc.

The Dask Dashboard status screen

The status screen shows the breakdown of the tasks being worked on. The screenshot above shows that we have converted 4 of the 20 partitions from Pandas dataframes into Dask dataframes, with the others being worked on across the cluster. We can also see that we have already stored over 4 GB of data, with plenty more to come.

The Dask Dashboard graph screen

The Graph screen shows graphically how the task has been divided up. We can see that each partition is being handled separately, so there are 20 partitions as expected. Each partition goes through three stages, stage 0 where it is converted from Pandas to Dask, stage 1 where the correlation matrix for that particular partition is calculated, and stage 3 where all the partitions are combined and the cross-correlations are calculated.

The Dask Dashboard System screen

The System screen shows the current system utilisation of resources, including CPU, Memory, and Bandwidth. This shows you how much computational power is being drawn by the cluster at any given time and is useful for seeing the current strain and load on the network.

You can use these screens to watch your calculation progress and you will gradually see the various Tasks turn green as they finish calculating. The results will then be propagated across the network to a single node which will then compile the results and return them to your Python program.

As you have already coded it to do, the resultant matrix will then be slimmed down to its relevant diagonal part, tabulated, and uploaded to your database.

Conclusion

You have now successfully set-up a computer cluster and used it to calculate a correlation matrix, simplify your results, and store them away for further analysis.

You can use these core components to do many more interesting things with your cluster, from machine learning to statistical analysis.

Let me know how you get on.

You can follow me on Twitter @mgrint or see more from me at https://grint.tech. Email me at matt@grint.tech.

--

--