Creating and deploying a big data solution

Let me put it simply: the ability to tackle large data has become an absolute need if you are in data analytics or the data science domain. In this article, we will learn to design a solution that you can simply create using your laptop/desktop. No fancy cloud solution is needed!
Today, we will learn the following:
1οΈβ£ What is Map Reduce 2οΈβ£ What is Python’s Multiprocessing Module 3οΈβ£ How to Train & Predict Multiple Models in Parallel
π Note: There are multiple tools and solutions available to handle Big Data. The aim here is to understand the basics and then move on to more sophisticated tools. I also assume that the reader has basic knowledge of data science, PyCaret (an open-source low code library), and is able to code in Python.
1οΈβ£ What is Map Reduce?
What naturally comes into our mind when we think of dealing with lots of data? An obvious solution is to get a more powerful machine. This idea is called "Vertical Scaling." You have one machine but have more power. This, however, turns out to be not a very practical solution. The alternate is "Horizontal Scaling," where multiple small machines are put together and we hope to use their combined resources to solve our problem. To achieve this, we first need to make a network between the machines/computers. You don’t need to worry about creating this network for now. However, In today’s world, it is much easier to create and inexpensive to maintain such networks, especially with all the cloud solutions available. This really isn’t hard.

As I mentioned above, creating a network (although it is easy) is out of the scope of this article, and we can still learn and apply the concept with our own personal desktop or laptop. We can use the cores available in our computer in the same fashion as we would use them in a network, as long as we have more than one core. After all, your own computer is a mini horizontally scaled solution; actually a network or multiple cores!
Once we have the network (horizontal scaling) in place, all we need is to distribute our data to these machines, use these resources to process our data and finally collect the results back. This distribution is equivalent to "map" and collection is equivalent to "Reduce". That is your map-reduce! All the famous big data solutions are actually built upon this concept. If you ask me, "map" is the hardest part of it. That is where one has to devise a strategy/logic to distribute the data to the entire network so that you can solve the problem at hand. Some problems are easy to solve and some are even harder, and some are even impossible.
Let us understand this with a very simple example. Say we have an array/list of numbers of size 16. We want to calculate the mean of it. In normal circumstances, we will do something like:
mean = np.mean([8,1,4,3,2,5,1,2,2,9,7,6,2,4,2,1])
The result would be 3.6875
. Now imagine, for some reason, our computer does not have enough power to compute it in one go. Well, can you think of a way to distribute/map this array to all the cores and then solve for the mean?
It is simple. We know that the "mean of means is same as the mean" (as long as the sample size is kept the same). So we can break our array into smaller arrays of equal size, send them to different cores, have every core calculate the mean of the smaller array, return the array of means, and then calculate the mean of the array of means! We just implemented map (breaking & distribution of array to cores) reduce ( shrinking the arrays to a number and getting it back).

In step 1, we break the array into subsets of arrays and send them to different cores. In step 2, every core calculates (π Note: all the cores are performing the calculation at the same time & not like a loop fashion!) the mean of sub-set it got. We save these results into another array, which is now of size 4. In step 3, We ask any core to perform another mean calculation on this array. We return the final mean to capture/display the result. (π Note: if we had a virtual network of computers, step 3 would be more complicated as the results would be collected from every computer and submitted back to the main computer.). That is a small example of map-reduce.
You can now extrapolate this example to some real-time big data issues. Think of applying some aggregation function (aka group by operations) to a huge data set. Without horizontal integration / map-reduce, it will be executed very slowly. We can perform the same aggregation function by chopping the data and then going through some logic to obtain what we want. That will be much quicker.
Another use-case would be a situation where you need to perform some operations in a "loop", e.g. say you have thousands of separate CSV files that you want to convert to any other file format say XLSX. Or you have to train separate models for every customer you have data for. With vertical integration, you will have to run everything in a loop, one by one, and you can imagine the time it will take to perform the entire task. On the other hand, if you apply map-reduce, you will be able to "parallelize" the operation and the procedure will be executed much, much faster! In fact, in our example, we will solve and implement the exact same issue.
π Note: There could be the data size issue, i.e. data is so huge that it doesn’t fit into a single machine’s memory. In this case, you will need a network of computers rather than a single machine. That solution is out of the scope of this article. Nonetheless, map-reduce will still be involved.
2οΈβ£ What is Python’s Multiprocessing Module?
The built-in multiprocessing module/library helps us implement the map-reduce strategy we just discussed. Simple as that! Through this library, we will instruct python to run independent parallel procedures that would save us a lot of time. The multiprocessing library is a great resource, but later on, another library called "concurrent" was added. This is built on top of the multiprocessing library and just makes things much simpler to handle. Although I would recommend checking the multiprocessing library here, for the sake of simplicity I will use the "concurrent" library. To use these libraries we need to understand a few important points.
π The first thing is the structure of the code we are going to run. We will use map()
function from the concurrent module, and we will provide this function with two parameters:
1οΈβ£ Target function: The function we want to execute w.r.t to our data processing. In the context of our previous example, you can think of it as numpy.mean()
.
2οΈβ£ An iterator object: a list/array-like object, that contains unique values, through which the data can be indexed. You can think of it as a slicer/divider of the data. In the context of our previous example, it will be a list [1,2,3,4] through which we sliced the data into 4 equal chunks.
map
function will distribute the target function and single element of the iterator to every core. Since our original data is in the global namespace (meaning all the cores can access it anytime) we can slice the data (using the single element) and apply the target function to it inside every core. This distribution of function and elements to all the cores is automatically managed by the concurrent module.
π The second thing we need to understand is the handling of input (or the data being used), output (the result we want), and all the variables that we may need to use during the process. In the absence of multiprocessing, we use this stuff all the time without having any issues. With multiprocessing, things are different. Every core processing the slice of data with the target function, can not really share any information /piece of the process with other cores. The impact of this limitation is that we can not update the value of a variable/output "easily". Fortunately, there are some workarounds available. There is a "shared" dictionary available through the multiprocessing module, which we can use to collect updated information as every core does its job. Alternatively (if your process allows) we can make an empty CSV file and then keep appending the results obtained from each core to it. Please note that this method will only work if you are deploying this solution to one machine. If you have a network of computers, this trick will not work. However, this method provides much more flexibility to us.
π The list thing we need t know is that this code structure can not be run through Jupyter Notebooks. We will need to write the entire code in the python module, which is merely writing python script in a .py file instead of a .ipynb file.
Before we go to an example, a short video I made would help us understand the structure:
A Simple example :
Let us do a simple project and apply the concepts I explained above. Say I have sales data of five different stores. I want to calculate the mean of daily sales by store. It is the same as if I want to apply a group by clause with the mean function. Only this time, we will use the concurrent module to get the same result.
Let us create our dataset. Our dataset will contain five stores, and every store will have five daily sales points. At your end, you can increase the data amount and number of stores as you like:

If you print the data set, you will see something like this :

We want to transform this data to get the mean of the daily sales per store, and end result should look like this:

The next step is to create the function that will achieve this transformation. Conceptually, this is the hardest part, the more advanced transformation you want, the harder the function be. In our case, it is a simple mean. This function should be constructed as if you are doing an ith step of a "for loop". Additionally, we need to get the shared dictionary and a CSV placeholder as discussed above.

All we need is to "execute" this function through the concurrent module we have, by calling the map
function. map
the function will take the target function and an iterable (list in our case) that it can use to slice and distribute the data and function to all the cores. For our example, it’s no other than the unique list of stores we have. We can also count the time it takes to run the parallel procedure.

Remember, we need to run all these snippets in one python module, and then run the module through the command line. The complete code, that you can run yourself in your terminal is below:
Once you run this file in your terminal, you should see the following output:

We also created a placeholder CSV file, read it from the directory where you ran the module and have a look at it. You should get something like this:

That’s it. You successfully ran the multi-processing job and obtained the results. For your experimentation purposes, you can increase the number of stores and the daily sales.
β οΈWarning: Unfortunately, for Windows users the above process will not run as expected. But we can achieve the same with couple of easy ways. First one is to change the structure of the code. Shared Dictionary and place holder CSV file will not store any thing for us. In order to get the results back, we need to return the desired object from the target function (and you can return anything! float, array, dictionary or any other object). When we run the executer , it will give us back a generator, and all we need is to run it with a loop or list comprehension. Below are the changes that you will need to make :

The second way , which I like the most (because of its flexibility) is to simply use Window’s Subsystem for Linux (WSL). This is a very handy and easy way to have Linux on your Windows without making so mush mess. I have a separate article on it and you can access it here. For the rest of the article I will assume that you have the Lunix /Mac system one way or the other.
If you have followed along and have understood everything up until here, you should be able to design and run your own solutions in a parallel way. You can simply stop, and start building your own. However, the article goes on to the final part where we handle a real-world example involving concurrent futures and Pycaret.
3οΈβ£ Train & Predict Multiple Models in Parallel
We have data for a retail business that owns many stores. Every store has daily sales records. In the real world, there would be many items in the store, but for simplicity let us assume that there is only one item to sell. Our job is to predict the sales of each store.
I have prepared some dummy data that you can have access to through the repository. In the training data, I have sales of 20 stores, where each store has 30 daily sales records. Let us have a look at the data:


We need to predict "sales_qty" and can use all other columns as "features". The data is clean and has no missing values. In the test dataset, we have to predict the "sales_qty" column. Here is how test data looks like:

π : You can download the data and Python modules from my repository.
In the usual circumstances, we will simply make a loop by store, train separate models for every store & save the trained model for later use. But we don’t want to do the loop, because a large number of stores and large daily sales records could become too much for the poor loop. We want to get this done in a parallel / concurrent fashion.
π Strategy: Our strategy is simple, and like before. We will make a training function (target function) and make an iterable (unique list of stores). The training function will contain everything that you would want to have to completely train a model for a single store. We will then save the trained model in the shared dictionary and save the prediction results on a CSV placeholder file, on the fly. I am assuming that you are familiar with the PyCaret and I do not need to explain the functionality, if not, you can start learning here. It is very easy!
Below is the code that will do the trick. It is nothing more than what we have already seen in our previous example plus I have elaborated it with comments to make it easy as you read.
While you are running this code in Linux / Mac terminal, you can check the status of all the cores to see if the code has made them busy. In your terminal type htop
and you should see something like this:

Once the process ends, it should give you the time in which it completed training. On my end, it took 70 seconds to train models for 20 stores, with 10 folds cross-validation, while choosing every model from a list of more than 20 models. That’s fast!
You should have the "predictions_place_holder.csv" file at the end, and it should look like this :

You should also be able to read the shared dictionary, which we saved as a pickle file to our working directory as a pickle file. If you load the pickle file:
with open("saved_models.p","rb") as fp:
models = pickle.load(fp)
and examine it, you will notice that we have a trained pipeline for every store, where the store number is the key. We can access it through indexing model[1092]
and you should get something like this:

This is a scikit-learn learn pipeline object containing the transformers and estimators in the end (see "trained_model" ), which means you can use the dot predict method model[1092].predict()
to get the prediction for store 1092 anytime you want!
This brings us to the end of the article. We learnt about map-reduce, Python’s multiprocessing module and how it works, and lastly how we can use the concurrent module to train and save multiple models through PyCaret in parallel. This is just the beginning, and you can fine-tune the processing, factorize the functions and go into the details of the multiprocessing module as you gain more experience. However, hopefully, you are now better equipped to deal with some of the big data solutions. I will appreciate your comments, thoughts & insights!
βββββββββββββββββββββββ You can follow me on medium & connect with me on LinkedIn & visit my GitHubβββββββββββββββββββββββ
You may also be interested in:
π Make your data science life easy with Docker π Custom Estimator With PyCaret, Part 1 π Custom Estimator With PyCaret, Part 2 π Get Linux Inside Windows The Easy Way