6 recommendations for optimizing a Spark job

A guideline of six recommendations that are quickly actionable for optimizing your Spark job

Simon Grah
Towards Data Science

--

Example of a time-saving optimization on a use case. Image by Author

Spark is currently a must-have tool for processing large datasets. This technology has become the leading choice for many business applications in data engineering. The momentum is supported by managed services such as Databricks, which reduce part of the costs related to the purchase and maintenance of a distributed computing cluster. The most famous cloud providers also offer Spark integration services (AWS EMR, Azure HDInsight, GCP Dataproc).

Spark is commonly used to apply transformations on data, structured in most cases. There are two scenarios in which it is particularly useful. When the data to be processed is too large for the available computing and memory resources. This is what we call the big data phenomenon. Finally, it is also an alternative when one wants to accelerate a calculation by using several machines within the same network. In both cases, a major concern is to optimise the calculation time of a Spark job.

In response to this problem, we often increase the resources allocated to a computing cluster. This trend is encouraged by the ease of renting computing power from Cloud providers.

The objective of this article is to propose a strategy for optimizing a Spark job when resources are limited. Indeed, we can influence many Spark configurations before using cluster elasticity. This strategy can thus be tested first.

In order to avoid an exhaustive search for the best configuration settings, which is naturally very costly, this post will exhibit actionable solutions to maximise our chances of reducing computation time. Each step will be materialized by a recommendation, as justified as possible.

The strategy presented is said to be greedy, i.e. we make the best choice at each stage of the process without going backwards. The approach is illustrated by a guideline of six recommendations.

Guideline of six recommendations. Image by Author

The purpose is to provide a clear methodology that is easy to test on various use cases. It is then necessary to test these recommendations on a shareable example and to give a template code that allows this experiment to be repeated on another Spark job.

Application on a toy use case

For assessing the optimisation strategy proposed in this article, we used a toy use case to design a Spark job. The processing groups French cities according to weather and demographic variables. This task is called unsupervised classification or clustering in Machine Learning. The example illustrates common features of a Spark pipeline, i.e. a data pre-processing phase (loading, cleaning, merging of different sources, feature engineering), the estimation of the Machine Learning model parameters, and finally the storage of the results to disk. More details on this experiment are available on the code repository.

By following step by step the recommendations detailed in this article, we can observe the influence of the optimization tips and tricks on the following graph.

Image by Author

The configuration settings found are likely a sub-optimal solution. Nevertheless, it offers a much faster alternative than an exhaustive search, especially in a big data processing context.

A quick reminder about Spark and some useful notions

Spark in a few words

Apache Spark is an analytics engine for large-scale data processing. It provides an interface for programming entire clusters with implicit data parallelism and fault tolerance and stores intermediate results in memory (RAM and disk).

The processing at the heart of Spark makes extensive use of functional programming to solve the problem of scaling up in Big Data. It is therefore natural that the source code is mainly coded in Scala.

However, Spark also offers APIs in Python, Java, R and SQL at a higher level, offering almost equivalent possibilities without any loss of performance in most cases (except for UDF functions for example). The project is open source and licensed under Apache 2.0.

The diagram below describes the classic operation of Spark processing in a distributed computing cluster.

Image by Author

The Spark driver, also called the master node, orchestrates the execution of the processing and its distribution among the Spark executors (also called slave nodes). The driver is not necessarily hosted by the computing cluster, it can be an external client. The cluster manager manages the available resources of the cluster in real time. With a better overview than the Spark application, it allocates the requested resources to the Spark driver if they are available. In this article, the use of the cluster manager will not be discussed.

Decomposition of a Spark job

Image by Author

A Spark job is a sequence of stages that are composed of tasks. More precisely, it can be represented by a Directed Acyclic Graph (DAG). An example of a Spark job is an Extract Transform Log (ETL) data processing pipeline. Stages are often delimited by a data transfer in the network between the executing nodes, such as a join operation between two tables. Finally, a task is a unit of execution in Spark that is assigned to a partition of data.

Lazy Evaluation

Lazy Evaluation is a trick commonly used for large data processing. Indeed, when data exceeds memory resources, a strategy is needed to optimise the computation. Lazy Evaluation means triggering processing only when a Spark action is run and not a Spark transformation.

Example of Spark actions and transformations. Image by Author

Transformations are not executed until an action has been called. This allows Spark to prepare a logical and physical execution plan to perform the action efficiently.

Let’s take an example to understand why this is useful. An action is called to return the first row of a dataframe to the driver after several transformations. Spark can then reorganise the execution plan of the previous transformations to get this first transformed row more quickly by managing the memory and computation. Indeed, only the partition of data containing the first row of that dataframe needs to be processed. This greatly alleviates memory and computations of unnecessary processing.

Wide and narrow transformations

The Spark transformations are divided into two categories: wide and narrow transformations. The difference between these two types is the need for a redistribution of the data partitions in the network between the executing nodes. This major event is called a shuffle in Spark terminology.

Image by Author

Wide transformations requiring a shuffle are naturally the most expensive. The processing time is longer depending on the size of the data exchanged and the network latency in the cluster.

How to modify the configuration settings of a Spark job?

There are three ways to modify the configurations of a Spark job:

  • By using the configuration files present in the Spark root folder. For example, we can customize the following template files:
    conf/spark-defaults.conf.template
    conf/ log4j.properties.template
    conf/spark-env.sh.template
    These changes affect the Spark cluster and all its applications.
  • With command line using the — conf argument
    Ex :
  • Directly in the Spark application code
    Ex:
  • The values defined in the configuration files are considered first. Next, the arguments passed as parameters to spark-submit. And finally those which are configured directly in the application code.

These configuration parameters are visible as read-only in the Environment tab of the Spark GUI.

Image by Author

In the code associated with this article, the parameters are defined directly in the Spark application code.

Preliminary step: Measure if an optimisation is necessary

Optimizing a process is a time-consuming and therefore costly step in a project. It must be justified beforehand.
Usually, the constraints are linked to the use case and are defined in the service level agreement (SLA) with the stakeholders. We monitor the relevant metrics (e.g. processing time, allocated memory) while checking their compliance with the SLA.
Estimating the time needed to optimise an application and reach an objective is not so easy. It often requires experience in software engineering. This article does not pretend to do so, but rather aims to suggest actions that can be used quickly.
On the other hand, these recommendations provide areas for improvement. It may be interesting to start first with a profiling of one’s treatment. This will target the most relevant recommendations to trigger.

Recommendation 1: Use the Apache Parquet file format

The Apache Parquet format is officially a column-oriented storage. Actually, it is more of a hybrid format between row and column storage. It is used for tabular data. Data in the same column are stored contiguously.

Image by Author

This format is particularly suitable when performing queries (transformations) on a subset of columns and on a large dataframe. This is because it loads only the data associated with the required columns into memory.
Moreover, as the compression scheme and the encoding are specific to each column according to the typing, it improves the reading / writing of these binary files and their size on disk.
These advantages make it a very interesting alternative to the CSV format. This is the format recommended by Spark and the default format for writing.
If Spark is used with Databricks, another particularly interesting format is the delta format which offers automatic optimisation tools. In this article, we will focus on the open source version of Spark. However, the interested reader is strongly encouraged to explore the integration of Spark into the Databricks ecosystem.

Recommendation 2: Maximise parallelism in Spark

Spark’s efficiency is based on its ability to process several tasks in parallel at scale. Therefore, the more we facilitate the division into tasks, the faster they will be completed. This is why optimizing a Spark job often means reading and processing as much data as possible in parallel. And to achieve this goal, it is necessary to split a dataset into several partitions.

Partitioning a dataset is a way of arranging the data into configurable, readable subsets of contiguous data blocks on disk. These partitions can then be read and processed independently and in parallel. It is this independence that enables massive data processing. Ideally, Spark organises one thread per task and per CPU core. Each task is related to a single partition. Thus, a first intuition is to configure a number of partitions at least as large as the number of available CPU cores. All cores should be occupied most of the time during the execution of the Spark job. If one of them is available at any time, it should be able to process a job associated with a remaining partition. The goal is to avoid bottlenecks by splitting the Spark job stages into a large number of tasks. This fluidity is crucial in a distributed computing cluster. The following diagram illustrates this division between the machines in the network.

Image by Author

Partitions can be created:

  • When reading the data by setting the spark.sql.files.maxPartitionBytes parameter (default is 128 MB).
    A good situation is when the data is already stored in several partitions on disk. For example, a dataset in parquet format with a folder containing data partition files between 100 and 150 MB in size.
  • Directly in the Spark application code using the Dataframe API. An example:

This last method coalesce decreases the number of partitions while avoiding a shuffle in the network.

One might be tempted to increase the number of partitions by lowering the value of the spark.sql.files.maxPartitionBytes parameter. However, this choice can lead to the small file problem. There is a deterioration of I/O performance due to the operations performed by the file system (e.g. opening, closing, listing files), which is often amplified with a distributed file system like HDFS. Scheduling problems can also be observed if the number of partitions is too large.

In practice, this parameter should be defined empirically according to the available resources.

Recommendation 3: Beware of shuffle operations

There is a specific type of partition in Spark called a shuffle partition. These partitions are created during the stages of a job involving a shuffle, i.e. when a wide transformation (e.g. groupBy(), join()) is performed. The setting of these partitions impacts both the network and the read/write disk resources.

The value of spark.sql.shuffle.partitions can be modified to control the number of partitions. By default, this is set to 200, which may be too high for some processing, and results in too many partitions being exchanged in the network between the executing nodes. This parameter should be adjusted according to the size of the data. An intuition might be to start with a value at least equal to the number of CPU cores in the cluster.

Spark stores the intermediate results of a shuffle operation on the local disks of the executor machines, so the quality of the disks, especially the I/O quality, is really important. For example, the use of SSD disks will significantly improve performance for this type of transformation.

The table below describes the main parameters that we can also influence.

Recommendation 4: Use Broadcast Hash Join

A join between several dataframe is a common operation. In a distributed context, a large amount of data is exchanged in the network between the executing nodes to perform the join. Depending on the size of the tables, this exchange causes network latency, which slows down processing. Spark offers several join strategies to optimise this operation. One of them is particularly interesting if it can be chosen: Broadcast Hash Join (BHJ).

This technique is suitable when one of the merged dataframe is “sufficiently” small to be duplicated in memory on all the executing nodes (broadcast operation). The diagram below illustrates how this strategy works.

Image by Author

The second dataframe is classically decomposed into partitions distributed among the cluster nodes. By duplicating the smallest table, the join no longer requires any significant data exchange in the cluster apart from the broadcast of this table beforehand. This strategy greatly improves the speed of the join. The Spark configuration parameter to modify is spark.sql.autoBroadcastHashJoin. The default value is 10 MB, i.e. this method is chosen if one of the two tables is smaller than this size. If sufficient memory is available, it may be very useful to increase this value or set it to -1 to force Spark to use it.

Recommendation 5: Cache intermediate results

To optimise its computations and manage memory resources, Spark uses lazy evaluation and a DAG to describe a job. This offers the possibility of quickly recalculating the steps before an action if necessary, and thus executing only part of the DAG. To take full advantage of this functionality, it is very wise to store expensive intermediate results if several operations use them downstream of the DAG. Indeed, if an action is run, its computation can be based on these intermediate results and thus only replay a sub-part of the DAG before this action.

Let’s take the following DAG as an example:

Image by Author

To obtain the results of the two actions, the treatments are described in the two DAGs below.

Image by Author

In order to speed up the execution, one can decide to cache intermediate results (e.g. the result of a join).

Image by Author

The processing of the second action is now simplified. Note that during the first action the results have not yet been stored in memory.

Image by Author

If this caching can speed up execution of a job, we pay a cost when these results are written to memory and/or disk. It should be tested at different locations in the processing pipeline whether the total time saving outweighs the cost. This is especially relevant when there are several paths on the DAG.

Example of a time saving with a two-column simulated table cached:

N.B. Caching, like any Spark transformation, is performed when an action is run. If the computation of this action involves only a sub-part of the data, then only the intermediate results for that sub-part will be stored. In the previous example, if the take(1) action collecting the first row had been called, only the partition containing that first row would have been cached.

A table can be cached using the following command:

The different caching options are described in the table below:

The full list of options is available here.

Recommendation 6: Manage the memory of the executor nodes

The memory of a Spark executor is broken down as follows:

Image by Author

By default, the spark.memory.fraction parameter is set to 0.6. This means that 60% of the memory is allocated for execution and 40% for storage, once the reserved memory is removed. This is 300 MB by default and is used to prevent out of memory (OOM) errors.

We can modify the following two parameters:

  • spark.executor.memory
  • spark.memory.fraction

Conclusion

In this article we have detailed a strategy for optimizing a Spark job. Its main objective is to provide a framework for anyone who wants to optimise a process but has limited time to do so. The guideline of the six recommendations with a greedy approach aims to maximise the probability of reducing computation time. The following diagram summarises the method proposed by associating the recommendations at each stage.

Image by Author

The value of each step in this process was explained with regards to the operation of Spark. Indeed, even if some recommendations on configurations are suggested, it is essential to understand how it works under the hood. Each use case has its own specificities, and no method can always be generalizable. In this respect, this article is an introduction to optimisation in Spark.

To go further

  • This article aims to avoid the systematic use of dynamic resource allocation in a cluster to accelerate processing. Nevertheless, it is of course interesting to study it. Spark provides the following parameters to manage the elasticity of a cluster according to the workload:
    spark.dynamicAllocation.enabled,
    spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.schedulerBacklogTimeout, spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.executorIdleTimeout
  • Many other table join strategies are available in Spark: https://databricks.com/session/optimizing-apache-spark-sql-joins
  • Of course, Spark places great emphasis on optimizing its processing. For a closer look at the inner workings and current projects, the following links may be useful: Tungsten project, Catalyst optimizer and Adaptive Query Execution (a new feature in version 3.0)

Thanks for reading ;)

--

--