Apache Spark Performance Boosting

A comprehensive guide about performance tips for Pyspark

Halil Ertan
Towards Data Science

--

Apache Spark is a common distributed data processing platform especially specialized for big data applications. It becomes the de facto standard in processing big data. By its distributed and in-memory working principle, it is supposed to perform fast by default. Nonetheless, it is not always so in real life. This post is a follow-up to my previous post which is about setting configuration parameters to optimize memory and CPU allocation in Spark. Here, I will mention some useful coding implementations while developing in Pyspark to increase performance in terms of working duration, memory, and CPU usage.

Photo by Vered Caspi on Unsplash

1 — Join by broadcast

Joining two tables is one of the main transactions in Spark. It mostly requires shuffle which has a high cost due to data movement between nodes. If one of the tables is small enough, any shuffle operation may not be required. By broadcasting the small table to each node in the cluster, shuffle can be simply avoided.

Let's assume that you are working on a force field dataset and have a data frame named df_work_order which contains the work orders the force field teams handle. And additionally, you have another data frame that includes the city information of field force teams. While there are more than 100M rows and lots of columns in df_work_order, there are approximately 100 records in the df_city data frame. To add city information to the df_work_order data frame, broadcasting the small table would work fine.

df_work_order = df_work_order.join(broadcast(df_city), on=[‘TEAM_NO’], how=’inner’)

The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark.sql.autoBroadcastJoinThreshold which is by default 10MB.

2 — Replace Joins & Aggregations with Windows

It is a common pattern that performing aggregation on specific columns and keep the results inside the original table as a new feature/column. As expected, this operation consists of an aggregation followed by a join. As a more optimized option mostly, the window class might be utilized to perform the task. I think that it is a frequent pattern, I find it worth mentioning. A simple benchmark and DAG(Directed Acyclic Graph) representations of two methods can be found here.

# first approachdf_agg = df.groupBy('city', 'team').agg(F.mean('job').alias('job_mean'))df = df.join(df_agg, on=['city', 'team'], how='inner')# second approachfrom pyspark.sql.window import Windowwindow_spec = Window.partitionBy(df['city'], df['team'])
df = df.withColumn('job_mean', F.mean(col('job')).over(window_spec))

3 — Minimize Shuffles

Spark operators are often pipelined and executed in parallel processes. However, a shuffle breaks this pipeline. They are kinds of materialization points and triggers a new stage within the pipeline. At the end of each stage, all intermediate results are materialized and used by the next stages. Within each stage, tasks are run in a parallel manner.

In principle, shuffle is a physical movement of data across the network and be written to disk, causing network, disk I/O, and data serialization thus making the shuffle a costly operation. In other words, it is the redistribution of data for a reason. In Spark, these reasons are transformations like join, groupBy, reduceBy, repartition, and distinct. These are very common transformations. Thus, shuffle is nearly inevitable for Spark applications. However, mitigating shuffling is our responsibility. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner.

To decrease network I/O in the case of shuffle, clusters with fewer machines and each one has larger resources might be created. However, it is totally a design decision and should not be made a decision only considering minimizing shuffles.

  • Joins from the point of shuffles

As I mentioned before, join is one of the prevalent operations which requires shuffle. Since it is a very common transformation and also shuffles in join might be preventable, I want to discuss it in a separate section. Spark provides three different algorithms for joins — SortMergeJoin, ShuffleHashJoin, and BroadcastHashJoin. Since version 2.3, SortMergeJoin is the default join algorithm. Optimum performance can be achieved with BroadcastHashJoin, however, it has very strict limitations with the size of data frames.

Shuffle might be avoided, but of course with a trade-off. Most of the time, shuffle during a join can be eliminated by applying other transformations to data which also requires shuffles. The point is that you create how many shuffles at extra, and in return how many shuffles you will prevent. Additionally, data volumes in each shuffle is another important factor that should be considered — one big shuffle or two small shuffles? The answers to all these questions are not straightforward, if those were, it would be the default behavior of Spark. It is really dependent on the data on which you are working.

As a rule of thumb, if each partition of the first table is used by at most one partition of the second table in the join, there is simply no need to shuffle. However, if each partition of the first table may be used by multiple partitions of the second table in the join, then a need to shuffle arises. In this manner, we can avoid shuffle by repartition or bucketing both tables on the same key values before implementing the join. Keep in mind that these operations also require shuffle.

Shuffling during join in Spark

A typical example of not avoiding shuffle but mitigating the data volume in shuffle may be the join of one large and one medium-sized data frame. If a medium-sized data frame is not small enough to be broadcasted, but its keysets are small enough, we can broadcast keysets of the medium-sized data frame to filter the large-sized data frame. In this way, we might achieve that drastically less amount of data are shuffled if we are able to filter a significant amount of data from large-sized data.

list_to_broadcast = df_medium.select('id').rdd.flatMap(lambda x: x).collect()
df_reduced = df_large.filter(df_large['id'].isin(list_to_broadcast))
df_join = df_reduced.join(df_medium, on=['id'], how='inner')
  • Bucketing

Bucketing is another data organization technique that groups data with the same bucket value. It is similar to partitioning, but partitioning creates a directory for each partition, whereas bucketing distributes data across a fixed number of buckets by a hash on the bucket value. The information about bucketing is stored in the metastore. It might be used with or without partitioning. An important keynote is that partitioning should only be used with columns that have a limited number of values; bucketing works also well when the number of unique values is large. Columns that are commonly used in aggregations and joins as keys are suitable candidates for bucketing.

By applying bucketing on the convenient columns in the data frames before shuffle required operations, we might avoid multiple probable expensive shuffles. Bucketing boosts performance by already sorting and shuffling data before performing sort-merge joins. It is important the have the same number of buckets on both sides of the tables in the join.

To use it, the number of the buckets and the key column are specified. Needless to say, we should have a solid insight into the data for deciding the correct number of buckets. In a general manner, joins, groupBy, distinct transformations are benefited from bucketing.

df = df.bucketBy(32, ‘key’).sortBy(‘value’)

Any Cases More Shuffles Are Good?

Two different scenarios might come up. The first one is about increasing the parallelism level of the application by applying extra shuffles. If the application could not take advantage of all cores in the cluster because of the low level of parallelism, repartition can be applied to increase the partition number. In this way, the application might be more performant overall with an extra shuffle.

Secondly, when aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results. To mitigate the load on the driver, it can be carried out an extra round of distributed aggregation that divides the dataset into a smaller number of partitions thanks to an aggregate action. The values within each partition are merged in parallel, before sending their results to the driver for a final round of aggregation. In this way, the computation load in the driver will be alleviated.

4 — Cache Properly

Just because you can cache a data frame on memory, you should not do so instinctively. Keep in mind that execution memory and storage memory share a unified region. The more unnecessary caching, the more chance it to spill onto the disk which is a performance hit. In this way, recomputation may be faster than the price paid by the increased memory pressure. Several storage levels are available in Spark, it might be set accordingly in terms of the serialization, memory, and data size factors.

If a data frame will be used in the following steps again and again iteratively, it would be rational to cache it at the beginning to avoid repetitive transformation loads. That is an ideal case of using cache.

A misusage of caching I often observed is to cache a data frame right after reading from a data source like Cassandra or Parquet. In that case, entire data is cached without inspecting whether all data is relevant or not. For instance, in the case of reading from parquet, Spark will read only the metadata to get the count so it doesn’t need to scan the entire dataset. For the filtering query, it will use column pruning and scan only the relevant column. On the other hand, when reading the data from the cache, Spark will read the entire dataset.

As a note, if you apply even a small transaction on the data frame like adding a new column with withColumn, it is not stored in cache anymore. You can check the status of data frame with df.storageLevel.

5 — Break the Lineage — Checkpointing

Checkpoint truncates the execution plan and saves the checkpointed data frame to a temporary location on the disk and reload it back in, which would be redundant anywhere else besides Spark. However, in Spark, it comes up as a performance-boosting factor. The point is that each time you apply a transformation or perform a query on a data frame, the query plan grows. Spark keeps all history of transformations applied on a data frame that can be seen when run explain command on the data frame. When the query plan starts to be huge, the performance decreases dramatically, generating bottlenecks.

In this manner, checkpoint helps to refresh the query plan and to materialize the data. It is ideal for scenarios including iterative algorithms and branching out a new data frame to perform different kinds of analytics. More tangibly, after checkpointing the data frame, you don't need to recalculate all of the previous transformations applied on the data frame, it is stored on disk forever. Note that, Spark won’t clean up the checkpointed data even after the sparkContext is destroyed and the clean-ups need to be managed by the application. It is also a good property of checkpointing to debug the data pipeline by checking the status of data frames.

Caching is also an alternative for a similar purpose in order to increase performance. It obviously requires much more memory compared to checkpointing. There is a good comparison between caching and checkpointing, and when to prefer one of them to the other. You can take a look at here.

There is also an opinion here, where to place the checkpoints in the data pipeline. The location where the data will be stored can be defined while creating a Spark session.

# query plan without checkpointdf = df.filter(df['city'] == 'Ankara')
df = df.join(df1, on = ['job_id'], how='inner')
df.explain()
# query plan with checkpointdf = df.filter(df['city'] == 'Ankara').checkpoint()
df = df.join(df1, on = ['job_id'], how=’inner’)
df.explain()

6 — Avoid using UDFs

At first glance, user-defined functions(UDFs) are very useful materials for solving problems in a functional manner, and they really are. However, it comes together with a very high cost in Pyspark. They operate one row at a time and thus suffer from high serialization and invocation overhead. In other words, they make the data move between executor JVM and Python interpreter resulting in a significant serialization cost. Furthermore, after calling a Python UDF, Spark will forget how the data was distributed before. For this reason, usage of UDFs in Pyspark inevitably reduces performance as compared to UDF implementations in Java or Scala.

In this sense, avoid using UDFs unnecessarily is a good practice while developing in Pyspark. Built-in Spark SQL functions mostly supply the requirements. It is important to rethink before using UDFs in Pyspark. If you are still to use UDFs, consider using pandas UDFs which are built on top of Apache Arrow. It promises the ability to define low-overhead, high-performance UDFs entirely in Python and supported since version 2.3. As another option to alleviate the performance bottleneck caused by UDFs, UDFs implemented in Java or Scala might also be called from PySpark.

To be more clear on the unnecessary usage of UDFs, take a look at the following example, calculation of z-score with UDFs does not make any sense.

# Unnecessary usage of UDFs
z_score_udf = F.udf(lambda x, m, s: (x — m) / s, DoubleType())
df = df.withColumn('z_score',z_score_udf('completed_job',
'mean_completed_job', 'std_completed_job'))
# A better approach
df = df.withColumn('z_score',
F.round(((F.col('completed_job') — F.col('mean_completed_job')) /
F.col('std_completed_job')),2))

7 — Tackle with Skew Data — salting & repartition

The working duration of the entire stage is directly dependent on the longest running time of the task. If you spend enough time with Spark, you most probably encounter a scenario that the final task takes minutes, while the rest of the tasks in the stage let's say 199 tasks are executed in milliseconds. It is the result of uneven distribution of data along with the partitions, that is data skewness problem. This problem might occur during the intermediate stages of a Spark application. Moreover, if the data is highly skewed, it might even cause a spill of the data from memory to disk. To observe the distribution of data among partitions, glom function might be used. Additionally, uneven distribution of data might be also detected with the help of task execution time and tasks’ processed data volume information screened in the executor page of Spark UI.

partition_number = df.rdd.getNumPartitions()
data_distribution = df.rdd.glom().map(len).collect()

Spark 3.0 version comes with a nice feature Adaptive Query Execution which automatically balances out the skewness across the partitions. Apart from this, two separate workarounds come forward to tackle skew in the data distribution among the partitions — salting and repartition.

  • Salting

Salting a data set basically means adding randomization to the data to help it to be distributed more uniformly. An extra processing cost is paid in return for evenly distributed data across the partitions, and so performance gains. In aggregations and joins, all records with the same key are located in the same partition. For this reason, if one of the keys has more records compared to the others, the partition of that key has much more records to be processed. Salting technique is applied only to the skewed key, and in this sense, random values are added to the key. Then, <key1+random_salting_value> is obtained, and this created new key values are matched with the replicated corresponding key values in the other table if it is a join operation.

To clarify it, take a look at the following example where the key column is city information in join, and the distribution of the key column is highly skewed in tables. To distribute the data evenly, we append random values from 1 to 5 to the end of key values for the bigger table of join and compose a new column in the smaller table by exploding an array from 1 to 5.

Salting example in Spark
# Adding random values to one side of the join
df_big = df_big.withColumn('city', F.concat(df['city'], F.lit('_'), F.lit(F.floor(F.rand(seed=17) * 5) + 1)))
# Exploding corresponding values in other table to match the new values of initial table
df_medium = df_medium.withColumn('city_exploded', F.explode(F.array([F.lit(i) for i in range(1,6)])))
df_medium = df_medium.withColumn('city_exploded', F.concat(df_medium['city'], F.lit('_'), df_medium['city_exploded'])). \
drop('city').withColumnRenamed('city_exploded', 'city')
# joining
df_join = df_big.join(df_medium, on=['city'], how='inner')
  • Repartition

Repartition does a full shuffle, creates new partitions, and increases the level of parallelism in the application. More partitions will help to deal with the data skewness problem with an extra cost that is a shuffling of full data as mentioned above. However adding one shuffle to the query plan might eliminate two other shuffles, and speed up the running. Repartitioning might also be performed by specific columns. It would be very useful if there exists multiple joins or aggregations on these columns in the following steps.

Another approach is coalesce, differently from repartition that is used to increase or decrease the partition number with shuffling, it is used to reduce the partition number without shuffling. Coalesce may not solve the imbalance problem in the distribution of data.

# only set partition number
df = df.repartition(1000)
# only partition accroding to colums
df = df.repartition(['col_1', 'col_2', 'col_3'])
# reparition number and columns together
df.repartition(1000, ['col_1', 'col_2', 'col_3'])

Apart from data skew, I highly recommend taking a look at this post, which gives examples about the usage of repartition efficiently with use cases and explains the details under the hood.

Configuring the input format to create more splits and writing the input data out to HDFS with smaller block sizes are other techniques to increase the number of partitions.

8 — Utilize Proper File Formats — Parquet

Apache Parquet is a columnar storage format designed to select only queried columns and skip over the rest. It gives the fastest read performance with Spark. Parquet arranges data in columns, putting related values close to each other to optimize query performance, minimize I/O, and facilitate compression. Furthermore, it implements column pruning and predicate pushdown (filters based on stats) which is simply a process of only selecting the required data for processing when querying a huge table. It prevents loading unnecessary parts of the data in-memory and reduces network usage.

Parquet Format — Column Pruning & Predicate Pushdown

The point is that to take only relevant data from the data source independent of what type of data source you are using and simply prevent a full table scan. It is not directly a problem of Spark, but directly affects the performance of a Spark application.

For instance, reading direct partitions is very efficient if you are using Cassandra. However, it sometimes becomes tricky. Assume that, Cassandra table is partitioned by the date column, and you are interested in reading the last 15 days. In such a case, simply reading day one by one with the equal operator and then union them all together is much more performant than reading with a filter >date_current-15.

# day_from is the starting point of date info and sequential 15 days are queried. 
dfs =list()
for i in range(15):
day_i = day_from + timedelta(days=i)
df = self.sc_session \
.read \
.format('org.apache.spark.sql.cassandra') \
.options(table=self.table, keyspace=self.keyspace) \
.load()
df = df.filter(F.col('PARTITION_KEY_COLUMN') == day_i) # rather than > day_i
dfs.append(df)
df_complete = reduce(DataFrame.union, dfs) # union is a kind of narrow transformation which does not require shuffling

9 — Use toPandas with pyArrow

As an official definition, Apache Arrow is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data. More clearly, Apache Arrow is a bridge between cross-language platforms that facilities reading a Spark data frame and then writing data frame into Apache Cassandra without suffering enormous inefficient serialization and deserialization performance.

Apache PyArrow is the Python implementation of Arrow. It provides a Python API that brings together the functionalities of Arrow with Python environment including leading libraries like pandas and numpy. In Spark, data is processed very fast as long as it is in JVM. However, for some reasons like rich and talented data processing libraries in Python, the data might be moved between Python environment and JVM by Pyspark developers. In this sense, utilizing PyArrow while moving from pandas data frame to Spark data frame or vice-versa results in a huge performance improvement.

To use PyArrow, you should firstly install it via pip or conda. After that, enabling it in the configuration will be sufficient. The rest is the same, with no change in coding. Using pyArrow in Pyspark applications and what is happening under the hood in the conversion between pandas and Spark data frames is explained very clearly here.

pip install pyarrowspark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

TAKEAWAYS

  • Don’t use count() when you don’t need to return the exact number of rows. To check if data frame is empty, len(df.head(1))>0 will be more accurate considering the performance issues.
  • Do not use show() in your production code.
  • It is a good practice to use df.explain() to get insight into the internal representation of a data frame in Spark(the final version of the physical plan).
  • Always try to minimize the data size by filtering irrelevant data(rows/columns) before joinings.
  • Monitor Spark applications online/offline. It might give you any clues about unbalanced data partitions, where the jobs are stuck, and query plans. An alternative to Spark UI might be Ganglia.
  • Basically, avoid using loops.
  • Focus on built-in functions rather than custom solutions.
  • Ensure that key columns in join operation do not include null values.
  • Put the bigger dataset on the left in joins.
  • Keep in mind that Spark runs with Lazy Evaluation logic. So, nothing is triggered until an action is called. That might result in meaningless error codes.
  • Unpersist the data in the cache, if you don't need it for the rest of the code.
  • Close/stop your Spark session when you are done with your application.
  • In Spark 3.0, significant improvements are achieved to tackle performance issues by Adaptive Query Execution, take upgrading the version into consideration.
  • Prefer data frames to RDDs for data manipulations.
  • In general, tasks larger than about 20 KiB are probably worth optimizing.
  • In general, it is recommended 2–3 tasks per CPU core in your cluster.
  • It is always good to have a block within 128MB per partition to achieve parallelism.
  • Csv and Json data file formats give high write performance but are slower for reading, on the other hand, Parquet file format is very fast and gives the best performance in reading and slower than the other mentioned file formats concerning writing operation.
  • The physical plan is read from the bottom up, whereas the DAG is read from the top down.
  • The Exchange means a shuffle occurred between stages, and it is basically a performance degradation.
  • An excessive number of stages might be a sign of a performance problem.
  • Garbage collection(GC) is another key factor that might cause performance issues. Check it out from the Executors tab of Spark UI. You may typically use Java GC options in any GC-related case.
  • Serialization also plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. For Scala/Java-based Spark applications, Kryo serialization is highly recommended. In Pyspark, Marshal and Pickle serializers are supported, MarshalSerializer is faster than PickleSerializer but supports fewer data types.
  • Note that you might experience a performance loss if you prefer to use Spark in the docker environment. In our project, we observe that Spark applications take a bit longer time with the same configuration metrics in the docker environment.

USEFUL LINKS

--

--