Drop in-memory processing in favour of parallel execution and horizontal scaling
TL; DR: If you’re working with large amounts of data, BigQuery and Dataflow on the Google Cloud Platform (GCP) can boost your efficiency and make your life easier when generating datasets for machine learning.
Recently I was approached by the startup reviewr.ai for a data engineering task that consisted of preparing data for machine learning (ML) and training an ML model. Unlike larger corporations that apparently can afford to let their data scientists spend days or weeks on such a task because they have to work with on-prem infrastructure or with a cloud provider that was chosen mainly because the company has been licensing its office suite for years, a startup wants to get things done quickly – speed matters. And unlike a lot of corporations, a startup is typically very flexible when it comes to choosing the infrastructure for a certain task. Therefore, we agreed that this should be done on GCP which in my opinion is still the best cloud platform for all things big data and ML. So instead of spending days, I did the whole data preparation in one morning and took the afternoon off while the ML model was being trained.
This article is not about the machine learning part of the task or the accuracy of the ML model. It’s about the data engineering and preprocessing that has to happen before the machine learning can even begin. As most data scientists would probably agree, that’s where usually the majority of time and effort is spent.
An amazoning task
I was asked to train a simple NLP classification model using the amazon_us_reviews dataset from Hugging Face. It contains almost 151 million customer reviews from the Amazon website, amounting to about 76 GB of data. Based on the product reviews users have written, the model should classify the reviews into certain categories. The class membership (label) can be derived from the data contained in the dataset. Since this problem is pretty standard and because we wanted quick results, I used AutoML on GCP.
To be able to use the data in AutoML, I had to somehow select at most 1 million suitable samples out of the 151 million and convert them into the required format, i.e. one text file per sample containing the review text plus an index CSV file that contains the URIs and the classes of all the samples. Sounds daunting or time-consuming? It’s neither.
BigQuery: all in
Let’s start with getting a copy of the data. The files are stored on an AWS S3 bucket and are publicly accessible via HTTP request. Google Cloud Storage (GCS) is the right place to put them before we process them further. In particular, I’m going to ingest the data into BigQuery for reasons that will become apparent in a moment. There are several ways one can transfer the data to GCS – e.g. with a bash script that wget
s or curl
s the files and gsutil cp
s them to GCS in a loop or with a small Python script.
I run this in Cloud Shell rather than on my local machine because transferring the data over Google’s network takes considerably less time. Once all the files are in the GCS bucket, I simply load the them into BigQuery in one go using bq
, still from Cloud Shell:
bq load --source_format CSV --autodetect --field_delimiter tab
--skip_leading_rows 1 --quote ' ' [DATASET_NAME].amazon_us_reviews gs://[BUCKET_NAME]/datasets/amazon-us-reviews/raw/*.tsv.gz
As you can tell by looking at the flags, I let BigQuery infer the schema from the data and set the field delimiter to tab instead of comma and the quote character to none instead of double quotes ("tab delimited, with no quote and escape characters" as the dataset description states). BigQuery can natively handle gzipped files, so we don’t have to decompress the files first.
After a couple of minutes, the data are stored in BigQuery and ready for inspection:

On a side note: If you have an AWS account (and hence can create an access key ID and a secret access key), you can download the files without writing a single line of code using the Storage Transfer Service or even ingest the data directly into BigQuery with the BigQuery Data Transfer Service.
Next, let’s run a couple of queries to get a feeling for the data. For example, we could ask how many reviews there are per product group and star-rating and what’s the percentage of star-ratings within the product groups:

Or: Which product categories get the longest reviews on average?

As you can see, running these queries over 151 million rows only takes a couple of seconds. Hence, you can let your creativity go wild and dissect the data as quickly as you come up with questions. (However, keep in mind that you’re billed by the amount of data processed unless you go with the flat-price model.) If you want to plot the results, you can visualise them in Data Studio with a single click.
SELECT training data FROM dataset
However, we don’t just want to analyse the data. We also want to generate the training dataset for AutoML which means selecting 1 million samples from the total of 151 million and calculating the features. I don’t want to disclose exactly what reviewr.ai wanted me to work on, so let’s just assume we wanted to infer the rating (1–5 stars) from the review body. Ideally, we’d have 200,000 examples per class in this case.
First of all, we’ll exclude all reviews that don’t have a review body (i.e. WHERE review_body IS NOT NULL
). Also, we have to exclude rows that have the same review_body because the classification in the ground truth may become ambiguous and AutoML won’t allow duplicates anyway. As it turns out, there are quite a few of them:

To narrow down the results further, we’ll focus on reviews a majority of users found helpful. Also, we’ll require that a review received at least x votes. Where to set the threshold, x? Let’s build a histogramme to get a feeling for how the number of votes per rating are distributed:

If we set the threshold somewhere between 10 and 20, we seem to still have enough samples for every class, so I’ll go with 15. This means we only include reviews that received at least 15 votes, a majority of which were deemed helpful. Of course, we could go even further and require that every product category should be equally represented. For the purpose of this article, we leave it here. So, here is the query that finally defines our dataset:
Running a quick SELECT class, COUNT(*) FROM ... GROUP BY class
confirms that we have indeed 200,000 samples per class.
Once we’re happy with our query, we can run it and materialise the result as a table or save the query itself as a view. Loading the table will be faster than querying the view (which is basically the same as running the query itself). Nevertheless, I chose to create a view because we’re going to query it only once or a few times, so speed won’t matter really. The view has the advantage of documenting how the dataset was created and that you can easily modify and save it under a different name if you want to create a new dataset for another model iteration.
So, now that we came up with the definition of our training dataset by means of an SQL query – how do we get it into the format we need (one text file per example containing the review body plus a CSV file with the URIs and classes of all the examples)? Export the query result as a CSV file and write a Python script to process the file, line by line? That would take an eternity or two (the processing, not the writing of the script). Instead, let’s turn to yet another great tool that is specialised for this kind of workload: Apache Beam and Dataflow on GCP.
Parallel execution for unparalleled speed
To cite the documentation, "Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines". Dataflow is a fully managed runner for Apache Beam on GCP. The usual workflow is like this: Write your pipeline (in Java or Python), test it locally with a few elements to verify it works, then deploy it to Dataflow to process the whole dataset at scale in the cloud.
Apache Beam has a bunch of I/O connectors ("sources" and "sinks"), among others one for BigQuery and one for GCS. This means we can read directly from our BigQuery query result and write the output to a GCS bucket. Conceptually, the pipeline we’re going to build looks like this:

The pipeline reads from one source (the Bigquery query result, containing the review_id, the review_body, and the class) and produces two outputs: The first branch creates the 1 million text files with the review_body, the second one creates 1 million strings containing the URI (gs://...
) and the class, then concatenates them and writes the result to a single CSV file. The pipeline operates on a per-row basis. i.e each element is a row from our query result.
Here’s the full pipeline code:
The beauty of using Apache Beam is that because each row of the query result can be processed independently, we achieve massively parallel processing. As you can see below, Dataflow indeed scaled the number of workers up to a maximum of 527, processing about 2,200 rows per second:


The infrastructure required for the job is fully managed. Dataflow spends a couple of minutes provisioning the infrastructure (compute engines) before it starts the actual pipeline execution. After the job has successfully completed (or failed), it spins down and deletes the VMs.
Thanks to parallel execution, the equivalent of 72 CPU hours of work has been completed in just about 18 minutes. One important note: The extent to which Dataflow scales your workload might be limited by quotas. In particular, the Compute Engine API in-use IP addresses quota that has a default of 8 per region in newly created GCP projects can limit the amount of compute instances provisioned for the job (because every worker instance needs an IP address). Therefore, make sure to request a quota increase in advance should you find the existing quota to be too low.
And here we have it: 1 million text files and one CSV, ready to be imported into AutoML.


Takeaways
Now that the hassle of preparing this dataset has evaporated into the cloud, let’s draw some conclusions:
- Horizontal instead of vertical scaling. A lot of data scientists I know are fixated with vertical scaling. It doesn’t fit into memory? Let’s get a larger instance with more RAM. However, just throwing more memory at the problem while sticking with in-memory processing gives away the benefits of parallelising workloads and hence getting done more in less time. For the data analysis in BigQuery I didn’t have to spend a second considering how much memory it would require – I just opened the BigQuery UI and started querying. And the workers for my Dataflow job were all n1-standard-1 VMs with 1 vCPU and 3.75 GB of memory. The only instance that could face a memory bottleneck is the worker that collects and concatenates the 1 million rows before writing them to a text file. In this case, the resulting string used 123 MB, so not even close to hitting a limit. And if we did, we’d just create sharded files instead of one single file.
- Use the specialised Data Engineering tools on GCP. Why bother launching a Jupyter notebook and loading data into pandas every time you want to work with it when instead, you can just dump the data into BigQuery where it is persisted, allowing you to query it ad-hoc and in almost no time? You’re likely done with your data analysis in BigQuery by the time your Jupyter instance would have finished spinning up and you’ve managed to load all the data into a pandas DataFrame. You can still use Jupyter of course – it has its merits for data science tasks after all. But use it to query BigQuery and to write Apache Beam pipelines rather than to process the data in-memory. By the way: There are also a lot of ready-to-use Dataflow templates, so you may not even have to write your own pipeline.
- Lean and reusable process. The workflow with BigQuery and Apache Beam is lean and flexible. Need another training set with different features? Just write the query needed, start the pipeline, and 20 minutes later, you’re already importing your new dataset into AutoML. What if we want to train our own model in TensorFlow with the same datasets? Modify the Apache Beam pipeline slightly to output TFRecord files and you’re set.
- Efficient use of infrastructure. Working with these tools is efficient not only in terms of time spent, but also with respect to infrastructure usage and hence costs. As both BigQuery and Dataflow are serverless, there is no under or over-utilised infrastructure and no fixed costs (besides storage costs of course which are negligible in most cases). Given the ephemeral nature of the workload and the limited financial commitment that is needed, you might want to consider using GCP for this kind of data engineering tasks even if it isn’t your default workhorse.