Making Sense of Big Data

Tensorflow HUB makes available a variety of pre-trained models ready to use for inference. A very powerful model is the (Multilingual) Universal Sentence Encoder that allows embedding bodies of text written in any language into a common numerical vector representation.

Embedding text is a very powerful natural language processing (NLP) technique for extracting features from text fields. Those features can be used for training other models or for data analysis takes such as clustering documents or search engines based on word semantics.
Unfortunately, if we have billions of text data to encode it might take several days to run on a single machine. In this tutorial, I will show how to leverage Spark. In particular, we will use the AWS-managed Elastic MapReduce (EMR) service to apply the sentence encoder to a large dataset and complete it in a matter of a couple of hours.
Configuration
EMR Cluster
In this example, we would assume a cluster of a Master node (r4.4xlarge) and 50 core nodes (r4.2xlarge spot instances). The cluster will have a total of 400 cores and ~3TB of theoretical memory. In practice, each executor will be limited by YARN to a maximum memory of ~52GB.
If it is not affordable to spin a cluster with a lot of nodes, the total memory size of the cluster should not be a bottleneck as the Spark lazy execution mode would not require the whole dataset to be loaded in memory at the same time.
In order to take full advantage of the EMR cluster resources we can conveniently use the property "maximizeResourceAllocation". Moreover we also need to configure livy to not timeout our session (not required for spark-submit jobs).
We can achieve both by specifying the following configuration:
[{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},{"classification":"livy-conf","properties":{"livy.server.session.timeout-check":"false"}}]
I would also recommend to choose a recent release of emr-5.X and including at least the following software packages: Hadoop 2.8.5, Ganglia 3.7.2, Spark 2.4.4, Livy 0.6.0.
Add the open-to-the-world security groups for the Master and Core nodes (this will be required to access the Spark UI and Ganglia in case the cluster is deployed in a VPC).
Spark session
Create an EMR Notebook and connect it to the previously created cluster. Before to create the session we need to tune some memory configurations.
Since most of the computation and memory will be used by the python processes we need to change the memory balance between the JVM and python processes:

The specified executor memory will only account for the JVM but not for the memory required by external processes, in our case TensorFlow.
We need to tweak both the spark.yarn.executor.memoryOverhead to something greater than 10% of the spark.executor.memory as well as the allocated spark.python.worker.memory to avoid unnecessary disk spilling.
We will start by configuring those YARN parameters before to start the Livy session by running in a notebook cell:
%%configure -f
{ "conf":{ "spark.Pyspark.python": "python3", "spark.pyspark.virtualenv.enabled": "true", "spark.pyspark.virtualenv.type":"native", "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv", "spark.executor.memory": "50g", "spark.yarn.executor.memoryOverhead": "12000", "spark.python.worker.memory": "10g" }}
Out of 61GB available, we allocated 10GB for the python workers, 50GB to the JVM of which 12GB of overhead.
More details on configuration tuning in Best practices for successfully managing memory for Apache Spark applications on Amazon EMR.
Now can create a session executing a cell containing the spark context object:

Dependency management
AWS did a good job of making it easier to install libraries at runtime without having to write custom bootstrap actions or AMIs. We can install packages in both the master and core nodes using the install_pypi_package API:
for package in ["pandas==0.25", "TensorFlow==2.1.0", "tensorflow_text==2.1.1", "tensorflow-hub==0.7.0"]:
sc.install_pypi_package(package)
sc.list_packages()
It will install the provided packages and print out the list of installed packages in the python 3.6 virtual env.
Note: In Hadoop 3.0 (EMR 6.x) it should be possible to deploy a Spark cluster in Docker containers but I have not tried yet.
Embedding job
We need to have data loaded as a Spark DataFrame with a key column and a text column.
The embedding job would conceptually do the followings:
- download the TensorFlow multilingual universal sentence encoder model
- slice the data partitions into chunks of text documents
- embed each chunk in a NumPy matrix
- convert the matrix into a list of spark.sql.Row objects
muse_columns = [f"muse_{(format(x, '03'))}" for x in range(512)]
def get_embedding_batch(batch, model, id_col, text_col, muse_columns):
rows = [row for row in batch if row[text_col] is not None and len(row[text_col].split(" ")) >=3]
if len(rows) == 0:
return []
from pyspark.sql import Row
EmbeddingRow = Row(id_col, *muse_columns)
keys = [x[id_col] for x in rows]
text = [x[text_col] for x in rows]
embedding_mat = model(text).numpy()
return [EmbeddingRow(keys[i], *embedding_mat[i, :].reshape(-1).tolist()) for i in range(len(keys))]
def chunks(iterable, n=10):
from itertools import chain, islice
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, n - 1))
def get_embedding_batch_gen(batch,
id_col,
text_col,
muse_columns=muse_columns,
chunk_size=1000):
import tensorflow_hub as hub
import tensorflow_text
model = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual/3")
chunk_iter = chunks(batch, n=chunk_size)
for chunk in chunk_iter:
for row in get_embedding_batch(batch=chunk, model=model, id_col=id_col,
text_col=text_col, muse_columns=muse_columns):
yield row
A few gotchas:
- The model was downloaded and instantiated only once; alternatively, we could have used Spark native broadcast variables.
- In order to make the model working at run-time, we first had to import tensorflow_text in each executor
- We transformed an iterable of Row objects to an iterable of Row objects by only materializing one chunk of 1000 rows per time.
- We discarded any sentence with less than 3 tokens.
- The numpy float32 type is not compatible with the Spark DoubleType; thus, it must be converted into a float first.
Toy example
Let’s try this code with a small data sample:
english_sentences = ["dog", "Puppies are nice.", "I enjoy taking long walks along the beach with my dog."]
italian_sentences = ["cane", "I cuccioli sono carini.", "Mi piace fare lunghe passeggiate lungo la spiaggia con il mio cane."]
japanese_sentences = ["犬", "子犬はいいです", "私は犬と一緒にビーチを散歩するのが好きです"]
sentences = english_sentences + italian_sentences + japanese_sentences
Now we can run inference in batches using the mapPartitions API and then convert the results in a Spark DataFrame containing the key column and the 512 muse embedding columns.
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, FloatType
from pyspark.sql import Row
from functools import partial
sentences = [Row(id=i, text=sentence) for i, sentence in enumerate(sentences)]
sentence_embeddings_rdd = sc.parallelize(sentences).mapPartitions(partial(get_embedding_batch_gen, id_col='id', text_col='text'))
schema = StructType([StructField('id', StringType(), False)] + [StructField(col, FloatType(), False)
for col in muse_columns])
sentence_embeddings_df = sqlContext.createDataFrame(sentence_embeddings_rdd, schema)
In the above example, we manually specify the schema to avoid the slowdown of the dynamic schema inference.
Partitioning
The toy example should work straight away because the data sample is very tiny. If we have to make it scale for very large datasets we want to neither hit OutOfMemory errors nor store the output in many thousands of small parts. We may want to resize the partitions of the sentences RDD (order of tens of thousands) before mapping them and coalesce the embedding data frame to something reasonably small (a few hundred partitions) just before writing to the storage layer and reduce the number of output files.
Please note that the chunk size is slicing the partition to make tensors not too large when doing the inference but they don’t guarantee that the executor would not hold the whole partition in memory.
E.g. A dataset of 1billion text documents can be divided into 5k partitions of 200k documents each which means each partition will contain roughly 200 sequential chunks. The output should be saved as parquet files of 400 parts.
sentence_embeddings_rdd = large_text_corpus_rdd.repartition(5000).mapPartitions(embedding_generator_function) large_text_corpus_df = sqlContext.createDataFrame(large_text_corpus_rdd, schema) large_text_corpus_df.coalesce(400).write.option("compression", "snappy").parquet(output_path)
And that’s it, you can monitor the Spark job and eventually access the embedding divided into 400 almost equally sized parts in parquet format and compressed with snappy.
Monitoring
The main tool for monitoring a Spark job is its UI and Ganglia.
Spark UI
If we execute %info in the Jupyter notebook to get the list of current and past Livy sessions. From the widget, you also get the links to the Spark UI and the master hostname (from where you can access Ganglia at http://master_hostname/ganglia/)..) We will need to access those servers through a proxy in case the cluster was deployed in a private network.
From the Spark UI, we would expect a computation graph like the following:


We can observe the two levels of repartitioning: stage 32 repartition the data before the model inference and stage 33 would repartition before the write operation.
Ganglia
If you open the Ganglia UI and did everything correctly, you should expect to see something like this:

If you experience a large imbalance between memory usage and CPU usage you may want to change the instance types to a more computation-optimized family rather than the r4.
The execution time of each task was approximately 20minutes for 80k text sentences each and considering 8 tasks would be executed in concurrency within the same executor.
Conclusions
This approach can be adapted for running model inference with any machine learning library at any scale. The usage of EMR with spot instances will make it fast and cheap. We used the EMR notebook for convenience but you can wrap the same logic into a spark-submit job and use bootstrap actions for installing packages.
In addition to storing the embeddings in the form of a data frame, you could also extend the code for storing the raw tensors of each partition and load them into TensorBoard for efficient 3-dimensional visualization.
If instead, you are looking for ways to run TensorFlow in distributed mode on top of Spark you have to use a different architecture like explained in the article Scaling up with Distributed Tensorflow on Spark.
Please leave your comments and subscribe to stay up-to-date to the next tutorials.
Originally published at https://datasciencevademecum.com on May 21, 2020.