How I reduced data processing time from 5 days to 5 hours

Part 1 of the articles on AI with high-performance computing: data processing in parallel

Bhaskar Agarwal
Towards Data Science
5 min readMar 15, 2022

--

Image courtesy: CINECA.

In this series of articles, I will outline various ways one can leverage high-performance computing (HPC) in developing faster and more precise machine learning (ML) projects.

As a data scientist in the HPC division of CINECA, I was tasked with the problem of day-ahead hail prediction for the EU-funded CYBELE project. The purpose of this project was to develop a hail prediction model to be integrated in the Cybele Demonstrator 3, which was a result of a collaboration between CINECA, GMV, and CACV- a farmer federation based in Valencia that provided the user requirements and the historical observational data. The work presented in these posts was done with my colleagues at CINECA, namely Giorgio Pedrazzi, Roberta Turra, Gian Franco Marras and Michele Bottazzi.

Overview of the prediction region in red and historical hail events in black. Image courtesy: author.

Hail is a phenomenon that is likely to occur only when specific atmospheric conditions are met. This uncertainty makes the hail event rare and very hard to predict. Our approach was aimed at discriminating hail events from non-hail events, given the weather conditions, by learning from real events that occurred in the past. Thus, our hail prediction model was built based on automatic learning techniques, using the weather forecast data and the derived climate instability indices as input, and performing
validations against hail event data collected in the field.

The Problem

The input data was provided by ECMWF at a temporal resolution of 1 hour and spatial resolution of 5 Km covering around 200000 square kilometres in Spain. The data consisted of 73 fields describing the weather conditions for each grid point (described by its longitude and latitude) for each hour of the day, where some fields were computed at 65 different levels above the ground. Thus, the resulting number of variables was ~ 1000.

Overview of a GRIB file in our raw dataset. Image courtesy: author.

The input data needed to be downscaled to a 2.2 Km resolution in order to match the extension of the agricultural parcels and the crops grown in the target region of Spain. The finer grid of 205 x 333 ~ 68000 grid points was obtained by small scale modelling simulations, adapted to the area of interest, that required 640 cores (20 nodes) on an HPC machine, consuming 128 core hours for each forecast. Two forecasts (48 hours and 24 hours in advance) were run for each day in the historical data, provided by ECMWF, from 2015-2019 amounting to roughly 2100 days. This task was managed by my colleagues at CINECA, namely Gian Franco Marras and Michele Botazzi. The resulting volume of input data amounted to roughly 6 TB, i.e. 3 GB of data for each day for 5 years.

The Solution

The first task was to make this data ready for ML. The original data was written as hourly GRIB files, a binary format widely used in meteorology. One can think of it as a tensor (or N-dimensional array) where any given grid-point comes with its own set of values, in our case each of the final 68000 grid points have values corresponding to the 1000 meteorological features.

Data Format

Given that in order to tabulate the data, I had to write 2100 x 24 ~ 50000 files (one for every hour of every day for 5 years of data), with 68000 rows (data samples at that hour) and 1000 columns (the features at that hour), I decided to use a more compact data format with respect to csv. I went with feather (or parquet), a format that is highly portable and light-weight. Feather files are easily handled by Python if one installs the correct Apache Arrow bindings or simply put, the ‘PyArrow’ package. An excellent overview of the same is provided here.

Data Preparation

The task now was to read a binary GRIB file and rewrite it in the form of a feather file. This had to be done for all the 50000 files. While this step was not necessary, it was convenient as it is possible to read feather files with pandas, but the same is not true for GRIB files. The pseudo-code for the IO operation can be defined as follows:

def grib_to_feather(grib_file):
grib_data = pygrib.read(grib_file)
tabulated_file = tabulate(grib_data)
pyarrow.write(tabulated_file)
for grib_file in GRIB_FILES:
grib_to_feather(grib_file)

The function grib_to_feather takes 10 seconds per file (on average), thus to process all the 50000 files we would need roughly 5 days!

Data Preparation in Parallel

I decided to parallelise the IO operation under the ‘embarrassingly parallel’ paradigm. It is called so as little effort is required in parallelising the task at hand, especially because there is no communication between the resulting individual parallel tasks. In our case we can leverage this ideology since the raw data is written as separate files (a file for every hour), and we just need to read it in and write it out in a different form (and format). In other words, we can read-in the raw data and write-out the processed files asynchronously.

Carrying out IO operations in parallel asynchronously using 8 cpus. Image courtesy: author.

At CINECA we host one of the largest super-computing facilities of Europe. Thus, I decided to use just one node (32 cores) on one of our machines called ‘Marconi 100’. I modified the code by using the multiprocessing module in python. The pseudo-code is as follows:

from multiprocessing import Pooln_workers = 30
p = Pool(n_workers)
p.map(grib_to_feather,GRIB_FILES)

As simple as that! Since the files are independent of each other and the processing is stand-alone for each file, this method works perfectly. The map call can ‘map’ a function with a given input, to a list of viable inputs. In other words, it creates 30 parallel tasks where a task is defined as running the function grib_to_feather on a file name extracted from the list of file names stored in the list GRIB_FILES. As one task is completed on a processor (i.e. one raw GRIB file is read in, processed and written out as a feather file), the next available file name is passed to the processor. The call map takes care of this file-processor association under-the-hood.

Thus, with a parallel version of my IO script I obtained a speed up in proportion to the number of cores used, i.e. 4 hours instead of 5 days!

In the next article, we will see how one can speed up neural networks on supercomputers with data parallelism.

Note: All images in the article are created by, and property of the author.

--

--