Make computations on large cross joined Spark DataFrames faster

Reduce the number of partitions on the input DataFrames to speed up computations on the resulting cross joined DataFrame.

Apurva Joshi
Towards Data Science

--

Photo by Saffu on Unsplash

Apache Spark splits data into partitions and performs tasks on these partitions in parallel to make your computations run concurrently. The number of partitions has a direct impact on the run time of Spark computations.

Often times your Spark computations involve cross joining two Spark DataFrames i.e. creating a new DataFrame containing a combination of every row from the two input DataFrames. Spark multiplies the number of partitions of the input DataFrames when cross joining large DataFrames. This can result in a significantly higher number of partitions in the cross joined DataFrame. As a result, running computations on this DataFrame can be very slow due to excessive overhead in managing many small tasks on the partitions.

This blog post will demonstrate how repartitioning the large input DataFrames with a smaller number of partitions before cross join can make computations on the resulting cross joined DataFrame faster.

Let's consider two scenarios to understand how partitioning works when cross joining DataFrames:

Scenario 1: Smaller DataFrames

If your input DataFrames are smaller in size, then the cross joined DataFrame would have partitions equal to the number of the partitions of the input DataFrame.

scala> val xDF = (1 to 1000).toList.toDF("x")
scala> xDF.rdd.partitions.size
res11: Int = 2
scala> val yDF = (1 to 1000).toList.toDF("y")
scala> yDF.rdd.partitions.size
res12: Int = 2
scala> val crossJoinDF = xDF.crossJoin(yDF)
scala> crossJoinDF.rdd.partitions.size
res13: Int = 2

In this case,
Partitions of xDF == Partitions of yDF == Partitions of crossJoinDF

If the partitions of input DataFrames, i.e xDF or yDF are not equal, then the partitions of the cross joined DataFrame would be equal to one of the input DataFrames.

Scenario 2: Larger DataFrames

If we increase the data size of the input DataFrames, the partitioning behavior on the cross joined DataFrame changes.
In the following example, I have increased the number of rows in input DataFrames from 1000 to 1,000,000

scala> val xDF = (1 to 1000000).toList.toDF("x")
scala> xDF.rdd.partitions.size
res15: Int = 2
scala> val yDF = (1 to 1000000).toList.toDF("y")
scala> yDF.rdd.partitions.size
res16: Int = 2
scala> val crossJoinDF = xDF.crossJoin(yDF)
scala> crossJoinDF.rdd.partitions.size
res17: Int = 4

In this scenario, the partition size of the cross joined DataFrame is equal to the multiplication of input DataFrames partition

Partitions of crossJoinDF = (Partitions of xDF) * (Partitions of yDF).

If your input DataFrame has more columns or larger data types, you would be able to replicate this behavior on a DataFrame with few thousand rows as well.

The exact number of partitions for a DataFrame vary depending upon your hardware but the cross multiplication of partitions when cross joining large DataFrames is consistent across all types of hardware.

So what’s the problem if Spark is multiplying the partitions of large input DataFrames to create partitions for a cross joined DataFrame?

If your input DataFrame contains few hundred partitions (~100), which is typically the case when dealing with big data, then the cross joined DataFrame would have partitions in the order of ~ 10,000.

The number of DataFrame partitions has an impact on the run time of your computations:

  • If you have too few partitions, your computations will not be able to utilize all the parallelism available in the cluster.
  • If you have too many partitions, there will be excessive overhead in managing many small tasks, making your computations very slow to run.

Cross joining large DataFrames with few 100 partitions falls into the latter case, which results in a DataFrame with too many partitions in the order of 10,000. This makes any operations on the cross joined DataFrame very slow. You might encounter the following exception, when running operations on cross joined DataFrame with a lot of partitions:

org.apache.spark.SparkException Job aborted due to stage failure: 
Total size of serialized results of 147936 tasks (1024.0 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

This occurs because Spark is sending status data for each task back to the driver. Since there are lots of partitions (or tasks), this data can often go over the default limit of 1024 MB.

Increasing the value of the Spark configuration spark.driver.maxResultSize will make your computation run without throwing the above exception but will not make it any faster. The inherent problem of large number of partitions still persists.

How do you make the computation faster?

To make the computation faster, reduce the number of partitions of the input DataFrames before the cross join, so that the resulting cross joined DataFrame doesn’t have too many partitions.

As a real world example, I need to cross join two DataFrames df1 and df2 to compute cosine similarity between every combination of the rows from two DataFrames. Both the DataFrames consist of text and an array of doubles of length 500, representing text embeddings. DataFrame df1 consists of about 60,000 rows and DataFrame df2 consists of 130,000 rows. Running count on cross joined DataFrame takes about 6 hrs on AWS Glue with 40 Workers of type G.1X. Re-partitioning df1 and df2 into smaller number of partitions before cross join reduces the time to compute count on cross joined DataFrame to 40 mins!

For the purpose of illustration, I will take a small sample of df1 and df2 DataFrames. Cross joining df1 containing 17,000 rows and 200 partitions with df2 containing 15,000 rows and 200 partitions, creates a cross joined DataFrame with 40,000 partitions. Running a count on cross joined DataFrame takes 285,163,427,988 ns i.e 4.75 minutes.

Following code was executed on AWS Glue running with 40 workers with type G1.X using Spark 2.4

scala> df1.count()
res73: Long = 17000
scala> df1.show(4)
+---------------------------+--------------------+
| text| text_embedding|
+---------------------------+--------------------+
|eiffel tower location |[0.4, 0.02, 0.1, ...|
|pounds kilogram conversion |[0.01, 0.2, 0.1, ...|
|capital of spain |[0.05, 0.2, 0.2, ...|
|mount everest height |[0.07, 0.1, 0.1, ...|
+---------------------------+--------------------+
scala> df1.rdd.partitions.size
res74: Int = 200
scala> df2.count()
res75: Long = 15000
scala> df2.rdd.partitions.size
res76: Int = 200
scala> df2.show(4)
+------------------------------+--------------------+
| text| text_embedding|
+------------------------------+--------------------+
|where is eiffel tower located |[0.3, 0.01, 0.1, ...|
|how many pounds in a kilogram |[0.02, 0.2, 0.1, ...|
|what is the capital of spain |[0.03, 0.2, 0.2, ...|
|how tall is mount everest |[0.06, 0.1, 0.1, ...|
+------------------------------+--------------------+
scala> val finalDF = df1.crossJoin(df2)scala> finalDF.rdd.partitions.size
res77: Int = 40000
scala> time {finalDF.count()}
Elapsed time: 285163427988ns
res78: Long = 255000000

If we reduce the number of partitions on df1 and df2 to 40 before executing the cross join, the time to run count on the cross joined DataFrame reduces to 47,178,149,994 ns i.e 47 seconds! We chose 40 partitions to utilize all the parallelism available in 40 worker cluster.

scala> val df1 = df1.repartition(40)
scala> df1.rdd.partitions.size
res80: Int = 40
scala> val df2 = df2.repartition(40)
scala> df2.rdd.partitions.size
res81: Int = 40
scala> val finalDF = df1.crossJoin(df2)
scala> finalDF.rdd.partitions.size
res82: Int = 1600
scala> time {finalDF.count()}
Elapsed time: 47178149994ns
res86: Long = 255000000

Lowering the number of partitions before cross joining the DataFrames reduces the time to compute count on cross joined DataFrame by 6x on this sample data!

Take-away

Next time you find Spark computations slow after performing a cross join, do check the number of partitions on the cross joined DataFrame. If it has too many partitions, reduce the number of partitions on the input DataFrames to speed up operations on the resulting cross joined DataFrame.

--

--