Apache Spark is one of the most popular open-source distributed computing platforms for in-memory batch and stream processing. It, though promises to process millions of records very fast in a general manner, might cause unacceptable results concerning memory and CPU usage if it is initially configured improperly. Resource utilization of a Spark application is very crucial in especially cloud platforms like AWS. Unnecessarily usage of memory and CPU resources and long duration of working processes might augment the cost dramatically.
To make Spark work with high performance, two different points come up which are based on configuration level and code level. While the former is to configure the Spark correctly at the initial level, the latter is to develop/review the code by taking into account performance issues. In this post, I aim to give an insight into configuration settings. I also intend to write another writing about the best practices in coding. This post is mainly for Pyspark applications running with YARN in cluster mode.

Spark Architecture – In a simple fashion
Before continuing further, I will mention Spark architecture and terminology in brief. Spark uses a master/slave architecture with a central coordinator called Driver and a set of executable workflows called Executors that are located at various nodes in the cluster.
Resource Manager is the decision-maker unit about the allocation of resources between all applications in the cluster, and it is a part of Cluster Manager. Cluster Manager is a process that controls, governs, and reserves computing resources in the form of containers on the cluster. There are lots of cluster manager options for Spark applications, one of them is Hadoop YARN.
When a Spark application launches, Resource Manager starts Application Master(AM) and allocates one container for it. AM coordinates the execution of all tasks within its application. AM can be considered as a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own.
Once AM launches, it asks for containers and resource requests of containers from Resource Manager. After successfully receiving the containers, AM launches them. Inside the containers, it runs the user application code via Driver. After completion of the application, AM releases the resources back to Resource Manager. If AM crashes or becomes unavailable, Resource Manager can create another container and restart AM on it.
Driver is placed inside AM in cluster mode and responsible for converting a user application to smaller execution units called tasks and then schedules them to run on executors. These tasks are executed on the worker nodes and then return results to the Driver. Driver also informs AM of the executor’s needs for the application. It runs in its own JVM. Executors are the processes(computing units) at the worker’s nodes, whose job is to complete the assigned tasks. One executor container is just one JVM.
Spark Context is the main entry point into Spark functionality. Spark Context also tracks executors in real-time by sending regular heartbeat messages. Spark Context is created by Driver for each Spark application when it is first submitted by the user. It exists throughout the lifetime of the Spark application. Spark Context stops working after the Spark application is finished.
There are two different running modes available for Spark jobs – client mode and cluster mode. The difference basically depends on where Driver is running. While it runs in the client process in client mode, it runs in the AM in cluster mode. In production, cluster mode makes sense, the client can go away after initializing the application.
YARN Dependent Parameters
One of the leading cluster management frameworks for Spark is Yarn. In YARN terminology, executors and AM run inside containers. A container is an allocation of memory and CPU simply. When a Spark application is submitted through YARN in the cluster mode, the resources will be allocated in the form of containers by the Resource Manager.
In the yarn-site.xml file, adjusting the following parameters is a good starting point if Spark is used together with YARN as a cluster management framework. In the case of running Spark applications with non-spark applications in the same cluster, firstly setting these YARN parameters correctly is very important. In a way, these parameters will define the borders of your Spark applications within the cluster. The followings are the basic YARN parameters to be set correctly.
yarn.nodemanager.resource.memory-mb
yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb
yarn.nodemanager.resource.cpu-vcores
yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores
yarn.nodemanager.resource.memory-mb simply refers to the amount of physical memory that can be allocated for containers in a single node. It must be something lower than the total RAM value of the node considering the OS daemons and other running processes in the node. yarn.scheduler.minimum-allocation-mb and yarn.scheduler.maximum-allocation-mb parameters state minimum and maximum memory allocation values respectively a single container can get.
Similar reasoning is valid for CPU allocation of containers. yarn.nodemanager.resource.cpu-vcores determines total available vcores allocated for all containers in a single node. And each container gets vcores within the values of yarn.scheduler.minimum-allocation-vcores and yarn.scheduler.maximum-allocation-vcores parameters as the lower and upper limit.
Memory Management in Spark
Memory utilization is a bit more tricky compared to CPU utilization in Spark. Before deep dive into the configuration tuning, it would be useful to look at what is going on under the hood in memory management. A good summarization for memory management in Spark is depicted in the following diagram.

Executor container (it is one JVM) allocates a memory part that consists of three sections. They are Heap memory, Off-Heap memory, and Overhead memory respectively. Off-Heap memory is disabled by default with the property spark.memory.offHeap.enabled. To use off-heap memory, the size of off-heap memory can be set by spark.memory.offHeap.size after enabling it. A detailed explanation about the usage of off-heap memory in Spark applications, and the pros and cons can be found here.
Memory overhead can be set with spark.executor.memoryOverhead property and it is 10% of executor memory with a minimum of 384MB by default. It basically covers expenses like VM overheads, interned strings, other native overheads, etc.
And the heap memory where the fun starts. All objects in heap memory are bound by the garbage collector(GC), unlike off-heap memory. For the sake of simplicity, it might be considered that it consists of 3 different regions, Reserved Memory, User Memory, and a unified region including execution and storage memory. Reserved memory is reserved to store internal objects. It is hardcoded and equal to 300MB.
User Memory is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records. It is simply estimated by the following formula using spark.memory.fraction property. It determines how much JVM heap space is used for Spark execution memory. This property is recommended with a default value that is 0.6. So, User Memory is equal to 40% of JVM Executor Memory (Heap Memory).
User Memory = (Heap Size-300MB)*(1-spark.memory.fraction)
# where 300MB stands for reserved memory and spark.memory.fraction propery is 0.6 by default.
In Spark, execution and storage share a unified region. When no execution memory is used, storage can acquire all available memory and vice versa. In necessary conditions, execution may evict storage until a certain limit which is set by spark.memory.storageFraction property. Beyond this limit, execution can not evict storage in any case. The default value for this property is 0.5. This tuning process is called as dynamic occupancy mechanism. This unified memory management is the default behavior of Spark since 1.6. The initial values for execution and storage memory are calculated through the following formulas.
Execution memory = Usable Memory * spark.memory.fraction*(1-spark.memory.storageFraction)
Storage memory = Usable Memory * spark.memory.fraction*spark.memory.storageFraction
Execution memory is used to store temporary data in the shuffle, join, aggregation, sort, etc. Note that, the data manipulations are actually handled in this part. On the other hand, storage memory is used to store caching and broadcasting data. Execution memory has priority over storage memory as expected. The execution of a task is more important than cached data. The whole job can crash in case of insufficient execution memory. Additionally, it is important to keep in mind that the eviction process comes with a cost while adjusting parameters for the dynamic occupancy mechanism. This cost of memory eviction depends on the storage level of the cached data like MEMORY_ONLY and MEMORY_AND_DISK_SER. A clear explanation of memory management in Spark can be found here. Additionally, you can find another memory management view here in terms of garbage collection.
Main Configuration Settings
After setting corresponding YARN parameters and understanding memory management in Spark, we pass to the next section – setting internal Spark parameters.
Setting the configuration parameters listed below correctly is very important and determines the source consumption and performance of Spark basically. Let’s take a look at them.
spark.executor.instances: Number of executors for the spark application.
spark.executor.memory: Amount of memory to use for each executor that runs the task.
spark.executor.cores: Number of concurrent tasks an executor can run.
spark.driver.memory: Amount **** of memory to use for the driver.
spark.driver.cores: Number of virtual cores to use for the driver process.
spark.sql.shuffle.partitions: Number of partitions to use when shuffling data for joins or aggregations.
spark.default.parallelism: Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join and aggregations.
To understand the reasoning behind the configuration setting through an example is better. Let’s assume, we have a cluster consists of 3 nodes with the specified capacity values like depicted in the following visual.

The first step is the set spark.executor.cores that is ** mostly a straightforward property. Assigning a large number of vcores to each executor cause decrease in the number of executors, and so decrease the parallelism. On the other hand, assigning a small number of vcores to each executor cause large numbers of executors, and so might increase I/O cost in the application. In the lighting of the above-mentioned criteria, it is generally set as 5** as a rule of thumb.
The second step is to decide on the spark.executor.instances property. To calculate this property, we initially determine the executor number per node. One vcore per node might be reserved for Hadoop and OS daemons. It is not a rule of thumb, you might ask for help from system admins to decide on these values.
executor_per_node = (vcore_per_node-1)/spark.executor.cores
executor_per_node = (16–1)/5 = 3
spark.executor.instances = (executor_per_node * number_of_nodes)-1
spark.executor.instances = (3*3)-1 = 8
The third step is to decide on the spark.executor.memory property. To deduce for it, total available executor memory is firstly calculated and then memory overhead is taken into consideration and subtracted from total available memory. Similarly, 1 GB per node might be reserved for Hadoop and OS daemons. Note that running executors with too much memory often results in excessive garbage collection delays.
total_executor_memory = (total_ram_per_node -1) / executor_per_node
total_executor_memory = (64–1)/3 = 21(rounded down)
spark.executor.memory = total_executor_memory * 0.9
spark.executor.memory = 21*0.9 = 18 (rounded down)
memory_overhead = 21*0.1 = 3 (rounded up)
spark.driver.memory can be set as the same as spark.executor.memory, just like spark.driver.cores is set as the same as spark.executors.cores.
Another prominent property is spark.default.parallelism, and **** can be estimated with the help of the following formula. It is recommended 2–3 tasks per CPU core in the cluster. Spark reutilizes one executor JVM across many tasks, and efficiently supports tasks taking around 200 ms. Therefore, the level of parallelism can be set bigger than the number of cores in the cluster. Although the result of the formula gives a clue, it is encouraged to take into account the partition size to adjust the parallelism value. Recommended partition size is around 128MB. By using repartition and/or _coalesc_e, this property can be defined based on the need during shuffle operations. If the data volumes in shuffle operations are very different scale. During the flow in Spark execution, spark.default.parallelism might not be set at the session level
spark.default.parallelism = spark.executor.instances * spark.executor.cores * 2
spark.default.parallelism = 8 * 5 * 2 = 80
In the case of data frames, spark.sql.shuffle.partitions can be set along with spark.default.parallelism property.
Note that all the configuration settings approaches are based on maximizing the utilization of available resources. However, what about if multiple Spark applications are live on the same cluster, and share a common resource pool? At that point, taking advantage of spark.dynamicAllocation.enabled property might be an alternative. It is used together with spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors, and spark.dynamicAllocation.maxExecutors . As it can be understood from the property names, applications start with an initial executor number and then increase the executor number in a case of high execution requirement or decrease the execution number in a case of the idle position of executors within the upper and lower limits.
Another point of view might be applying an experimental methodology for the environments where running multiple Spark applications. To clarify it better, start with a configuration that validates the restrictions like working time duration. For instance, a scheduled Spark application runs every 10 minutes and is not expected to last more than 10 minutes. And then, decrease resources step by step as long as not violating restrictions.
Fair Scheduler
I also highly encourage you to take a look at fair scheduler in YARN if you have an environment in which lots of Spark applications are running. It gives a higher abstraction to manage the sharing of resources by multiple Spark applications. The goal is to make all the applications get a more or less equal share of resources over a period of time, and not penalize the applications with shorter execution time. Its configuration is maintained in two files: yarn-site.xml and fair-schedular.xml.
In fair scheduler, resource management is done by utilizing queues in terms of memory and CPU usage. The resources are shared fairly between these queues. The principal properties of queues might be counted as minResources, maxResources, weights, and schedulingPolicy. To make it more clear, let’s assume you have the same environment for developing new models and running scheduling applications in production for a reason. Separate queues might be defined for dev and prod applications, similarly, different queues might be defined for applications triggered by different users. Moreover, different weights might be assigned for different queues, so applications in the corresponding queues get the resources proportionally with the weights. You might get a simple answer here to the question about using both fair schedular and Spark configuration properties together.
USEFUL LINKS
Apache Spark Optimization Techniques
How-to: Tune Your Apache Spark Jobs (Part 2) – Cloudera Blog
Best practices for successfully managing memory for Apache Spark applications on Amazon EMR |…