Making Sense of Big Data

Overcoming Apache Spark’s biggest pain points

An advanced guide to the most challenging aspects of Spark and how data scientists and engineers can overcome them

Edson Hiroshi Aoki
Towards Data Science
11 min readOct 10, 2020

--

Computer photo created by KamranAydinov — www.freepik.com

It was about 6 years ago that I first used Apache Spark, which at that time, it was proof of being initiated in the world of “Big Data” analytics. There was no question that mastering Spark was a duty of any wannabe-data scientist or data engineer; after all, how else to leverage the massive amounts of data and distributed CPU computing to create the best possible Machine Learning models?

If at that time I could see the future in 2020, I would be perhaps a bit surprised that a large percentage of ML/AI practitioners still do not use Spark or only use it for data engineering, not machine learning. Part of it naturally is because of the partial shift of interest to GPU-oriented, rather than CPU-oriented Machine Learning techniques, especially deep learning. But for most applications outside image and natural language processing, where the usefulness of CPU-oriented techniques is unquestionable, it is surprising that many data scientists still heavily rely on single-machine ML tools such as Scikit-learn and the non-distributed versions of XGBoost and LightGBM.

Personally, I feel this is a pity because when used properly, Spark is an incredibly powerful tool for anyone who works with data, helping us to avoid wasting time figuring out how to fit large datasets into memory and processors, and allowing us to have full control of the data analytics workflow including extracting data, generating models, and deploying models into production and testing.

Having conducted workshops and coached dozens of data scientists and engineers on Apache Spark, I was able to make sense of the biggest struggles users typically faces with the tool, why they happen and how to overcome them. This two-part guide is meant for those who do not only want to be able to use Spark, but to truly understand the internals of the Spark in order to solve complex problems and generate high performance, stable code.

Note that I assume that the reader already has a basic understanding of Spark, for example, what are Spark drivers and executors, that datasets are divided into partitions, what is lazy evaluation and Spark’s basic data structures.

Part 1: Spark’s partitioning and resource management

The challenge

Unlike single-processor, vanilla Python (e.g. Pandas), where the details of the internal processing is a “black box”, performing distributed processing using Spark requires the user to make a potentially overwhelming amount of decisions:

  • How many partitions to use for each dataset?
  • When to repartition the dataset?
  • How many Spark executors to use, how much memory and how many cores to allocate to them?
  • How much memory to allocate to the Spark driver?

To make things more complicated:

  • A typical processing pipeline will involve multiple operations and each operation may result in datasets of dramatically different sizes, which can make it unlikely to find a single set of parameters that fits the entire data pipeline.
  • Certain Spark operations automatically change the number of partitions, making it even harder for the user to keep track of how many partitions are being used for each dataset. For example, a join operation will change the number of partitions of the output dataset to the number specified in the spark.sql.shuffle.partitions configuration parameter.

What is important to know

The implications of each decision regarding partitioning and resource management can be summarized as follows:

  1. If we use too many partitions for a particular table, it might lead to slowness/waste of resources due to multiple reasons such as optimization and parallelisation overhead, as the total amount of CPU necessary to process a certain amount of data by Spark as a single partition is much faster than processing it as multiple partitions. This is the case even if when partitions are being processed in the same physical server;
  2. If we use too few partitions for a particular table, it might lead to slowness/waste of resources, as many executors will be “idle” after finishing to process their partitions, waiting for other partitions to finish processing;
  3. If we perform too frequent re-partitioning, it might lead to slowness/waste of resources due to shuffling, which consists on re-arranging of the partitions across the executors. Shuffling is a high-cost operation, both in terms of processing and memory, and it severely limits Spark’s processing optimizations that we will later discuss;
  4. If we don’t use enough re-partitioning, it might lead to slowness/waste of resources due to executor overloading and idleness caused by unbalanced partitions; i.e. a very large partition may both cause the executor to be under-resourced in terms of cores and memory, and making other executors be idle while they wait for the partition to be processed;
  5. If we use too many cores per executor, it might lead to slowness/waste of resources due to a lower than expected level of parallelisation. When the executor is controlling more than one core, it attempts to create multiple threads to process more than one partition at the same time, allowing in-memory variables to be shared between the threads. However, this parallelisation is not perfect and might be limited by I/O operations (e.g. writing to HDFS or other distributed data storage). Often, a single core will be used even when the user assigned multiple cores per executor. Additionally, since an executor can only run in one physical server, it might not be possible to allocate all cores requested by the user in the executor. The number of cores effectively used by an executor can be monitored by the user via the Spark UI;
  6. If we use too few cores per executor, it might lead to slowness/waste of resources due to the overhead needed to create each executor. Since Spark is aware that cores in the same executor are guaranteed to be in the same physical server, it can optimise the processing such that the parallelisation overhead between them is much smaller than across executors.

To handle the variation on the need of partitions and resources across the multiple steps of the pipeline, and the difficulty on keeping track on the number of partitions of each intermediary dataset, there are two strategies which can be used separately or be combined:

  • When the computational resources are shared across a pool of users or the user is running multiple jobs simultaneously, activating dynamic allocation of executors via the spark.dynamicAllocation.maxExecutors and spark.dynamicAllocation.enabled parameters can considerably decrease idleness of Spark’s computational resources;
  • Splitting a large processing job into multiple smaller jobs, each with a smaller variation on the dataset sizes, allows us to better tune the configuration parameters and the number of partitions for each step. For each of these smaller jobs, we can also set the parameters spark.default.parallelism and spark.sql.shuffle.partitions appropriately to prevent the need for constant re-partitioning. Pipeline tools like Airflow can be extremely useful to set up and deploy complex data tasks consisting of smaller jobs.

Part 2: Spark’s immutability, lazy evaluation and execution plan optimization

The challenge

Two of the first things that a Spark user learns are the concepts of immutability and lazy evaluation. Immutability means that Spark datasets cannot be modified; every operation on a dataset will create a new dataset. Lazy evaluation is based on the fact that there are 2 types of operations on datasets:

  • Transformations produce a new Spark dataset as output (Spark has the immutability property so it can never modify existing datasets, only create new ones);
  • Actions take Spark datasets as inputs but result in something else than a Spark dataset, such as writing into storage, creating a local (non-Spark) variable or displaying something in the user’s UI.
Transformations vs actions illustrated (Image by author)

When the process calls a transformation operation, this results in the creation of an immutable, in-memory “execution plan” that describes how to generate the dataset based on other datasets. However, no actual data is generated at this point. Only when an action operation is called, Spark evaluates the execution plans of the inputs in order to generate the data necessary to calculate the outputs. Since the inputs themselves may have execution plans that depend on other datasets, these are recursively evaluated, and the process continues.

If at some point another action is triggered that requires the same intermediate dataset to be re-created again, the process repeats itself; the execution plan is re-created and all steps the execution plan need to be executed again. To prevent this from happening, the user can “persist” the intermediate dataset. When a “persisted” dataset is created via triggering an execution plan, the dataset will be saved into the distributed memory (or some configured distributed storage, if there is not enough space in memory), where it will stay until manually “un-persisted” or until the Spark’s garbage collector identifies that the dataset is “out of scope” (not accessible by the running code).

Many Spark users do not deeply think about the concepts of immutability and lazy evaluation and their practical consequences, assuming that it’s sufficient to know that we should “persist” an intermediate dataset when it is going to be used more than once, preventing duplication of computation more than one action requires the same intermediate dataset.

The truth, however, is that making good use of immutability and lazy evaluation goes far beyond that. As a matter of fact, they are closely related to a less known aspect, the execution plan optimization done via the Spark Catalyst. Not understanding these concepts in depth can easily lead to creeping slow, unstable Spark processing, as well as a lot of time wasted by the user on debugging and deciphering cryptic error messages.

What is important to know

  • Spark performs transformations at partition level, not dataset level

We may have the false impression that calling an action leads the series of transformations that precede the action to be executed one by one, each step generating one intermediate dataset. If that was the case, if DataFrame2 is generated by transforming DataFrame1, Spark would first create DataFrame1, and then create DataFrame2, as it would be the case if we were using Pandas.

But it’s not as simple as that. When an action is called, all the necessary execution plans are actually merged into a single execution plan, and except in the case of shuffling, the processing of each partition is done independently. This means that transformations are being done at partition level, not dataset level. So for one partition, Spark might still be generating DataFrame1, whereas, for another partition, Spark might already be generating DataFrame2 from DataFrame1. For example, we could have a situation as follows:

(Image by author)

From a performance point of view, this is a powerful mechanism as it prevents processor and memory wastage, i.e. waiting for an entire intermediate dataset to be generated before moving to the next intermediate dataset. Another advantage of partition-level transformations is that as we will see in the next section, it allows Spark to better optimise processing.

  • Spark optimizes execution plans, and the larger the execution plans, the better for optimization

The Spark Catalyst is undoubtedly one of Spark’s most valuable features., as efficient distributed processing is far more complex to achieve than efficient single-core or single-memory processing. Essentially, the Catalyst will optimise execution plans to maximise distributed performance. For example, in Spark, performing multiple operations in the same row at time, and then moving to the next row and so on, is much a faster than performing multiple operations in the same column and then moving to the next column, so the execution plan is optimised accordingly.

An important factor to be considered is that a single, large execution plan has far more optimization potential then multiple smaller ones. Similarly, by leveraging on the fact that transformations are done at partition level, not dataset level, Spark can combine multiple transformations for the same partitions and optimise them for even better performance. Taking our previous example, the result after optimisation would be like this:

(Image by author)
  • Spark’s lazy evaluation and partition-level transformations make it more complex to debug

However, from a development / debugging point of view, if something wrong happens or if there is a performance bottleneck, lazy evaluation, combined with partition-level transformations, make it quite hard for the user to figure out exactly what processing step is causing the error or bottleneck. After all, the Spark UI will only tell the user which triggering action is causing the error or bottleneck, not the actual transformation.

The solution is to “force” Spark to generate one entire intermediate dataset at each time by inserting multiple persist() statements followed by actions, such as count() or take(). However, this code would be highly inefficient due to the aforementioned processor/memory wastage and lack of optimization, so it’s better to use employ such sort of solution only for development purposes (not production) and on a temporary basis.

  • Too large / too many execution plans can also become a problem

One thing that I have observed is that execution plans do not always scale linearly with the number of operations but sometimes polynomially or exponentially. Since execution plans are stored in the Spark driver’s memory (unlike persisted objects that are stored in the Spark executors’ memory), this may cause Spark to run out of driver memory or become extremely slow due to the Spark Catalyst’s optimisations. This is somewhat ironic as the Catalyst is supposed to make the code faster to run when deployed to the Spark executors; but the Catalyst, that runs in the Spark driver, can become the bottleneck of processing itself.

This is particularly true for DataFrames with thousands of columns where we need to perform multiple operations across all the columns. Spark’s immutability property aggravates the situation by constantly having new instances of executor plans created and evaluated by the Catalyst.

Persisting can be an effective strategy to deal with this problem. It would eliminate the need of creating new instances of the executor plan up to generating the persisted intermediate dataset, as well as preventing the Catalyst from trying to optimise the execution plan before the creation of the dataset.

Yet for complex, long pipelines, executions plans become exceedingly huge and not even frequent persisting can prevent slowness caused by optimization or the driver from running out of memory, as persisting can prevent multiple instantiations of large executions plans but not their creation in the first place. In addition, too much persisting can become a problem itself, as it takes up distributed memory and storage space. While attempting to unpersist often and/or trying to structure the code to maximise garbage collection certainly helps, this is a formidable engineering task and cannot always prevent depletion of resources.

One solution for that is a technique named checkpointing, which consists of saving and loading intermediate results from an appropriate storage system defined by the user. When used efficiently, checkpointing prevents both the huge growth on the size of execution plans and compromising too many Spark’s distributed resources with persisted objects.

Another solution, already mentioned, is to split a large processing job into smaller jobs. In this way, execution plans cannot become too large and there is no need to worry about leftover objects in either the driver or executors’ memory.

Conclusion

There is certainly far more on Spark that can be covered on this article, e.g. how to structure your code to improve the quality of Catalyst’s optimisations. But I hope that this article can help several people to overcome their difficulties with Spark and leverage on its extraordinary power and versatility for both data engineering and machine learning. While Spark remains a challenging tool to work with, I can give an honest testimony on how Databricks and the open-source community dramatically improved it in terms of user-friendliness and flexibility in the 6 years I have been using it.

If you want to learn how to use Spark and Apache Hadoop YARN to run multiple distributed jobs in parallel, such as multiple instances of model training, check my another article.

--

--

Data scientist at DBS Bank. All views are solely my own and not from my organization.