The world’s leading publication for data science, AI, and ML professionals.

Spark & Databricks: Important Lessons from My First Six Months

…and how you can avoid them

Getting Started

Photo by Kristopher Roller on Unsplash
Photo by Kristopher Roller on Unsplash

If you’re reading this article then perhaps, like me, you have just started a new tech job and are trying to leverage Spark & Databricks for big data operations. Whilst Databricks has a friendly-looking UI that surfaces the complex internal workings of Spark do not be fooled; there are many traps and pitfalls which new users can find themselves in. These can lead to highly inefficient coding practices causing ‘hanging’ operations or inexplicable errors that will leave you scratching your head.

In my first six months of using Spark, I learned two very important lessons which drastically improved the performance of my code and helped me to program with a mindset oriented around distributed computing. I would like to share these lessons with you to help develop your own understanding and potentially fast track you through some problems you may currently be facing in your work.

I will illustrate these lessons through the problems caused, some of the theory behind them, and some practical usage examples which could aid in the understanding of these common Spark issues.


1. Understanding Partitions

1.1 The Problem

Perhaps Spark’s most important feature for data processing is its DataFrame structures. These structures can be accessed in a similar manner to a Pandas Dataframe for example and support a Pyspark API interface that enables you to perform most of the same transformations and functions.

However, treating a Spark DataFrame in the same manner as a Pandas DataFrame is a common mistake as it means that a lot of Spark’s powerful parallelism is not leveraged. Whilst you may be interacting with a DataFrame variable __ in your Databricks notebook, this does not exist as a single object in a single machine, but in fact, the physical structure of the data is vastly different under the surface.

When first starting to use Spark you may find that some operations are taking an inordinate amount of time when you feel that quite a simple operation or transformation is being applied. A key lesson to help with this problem, and understanding Spark in earnest, is learning about partitions of data and how these exist in the physical realm as well as how operations are applied to them.

1.2 The Theory

Beneath Databricks sits Apache Spark which is a unified analytics engine designed for large scale data processing which boasts up to 100x performance over the now somewhat outdated Hadoop. It utilises a cluster computing framework that enables workloads to be distributed across multiple machines and executed in parallel which has great speed improvements over using a single machine for data processing.

Distributed computing is the single biggest breakthrough in data processing since limitations in computing power on a single machine have forced us to scale out rather than scale up.

Nevertheless, whilst Spark is extremely powerful it must be used correctly in order to gain maximum benefits from using it for Big Data Processing. This means changing your mindset from one where you may have been dealing with single tables sitting in a single file in a single machine, to this massively distributed framework where parallelism is your superpower.

In Spark, you will often be dealing with data in the form of DataFrames which are an intuitive and easy to access structured API which sits above Spark’s core specialised and fundamental data structure known as RDDs (Resilient Distributed Datasets). These are logical collections of data partitioned across machines (distributed) and can be regenerated from a logical set of operations even if a machine in your cluster is down (resilient). The Spark SQL and PySpark APIs make interaction with these low-level data structures very accessible to developers that have experience in these respective languages, however, this can lead to a false sense of familiarity as the underlying data structures themselves are so different.

Distributed datasets that are common in Spark do not exist on a single machine but exists as RDDs across multiple machines in the form of partitions. So although you may be interacting with a DataFrame in the Databricks UI, this actually represents an RDD sitting across multiple machines. Subsequently, when you call transformations, it is key to remember that these are not instructions that are all applied locally to a single file, but in the background, Spark is optimising your query so that these operations can be performed in the most efficient way across all partitions (explanation of Spark’s catalyst optimiser).

Figure 1 - Partitioned Datasets (image by the author)
Figure 1 – Partitioned Datasets (image by the author)

Taking the paritioned table in Figure 1, as an example if a filter was called on this table the Driver would actually send instructions to each of the workers to perform a filter on each coloured partitions in parallel before combining the results together to form the final result. As you can see for a huge table partitioned into 200+ partitions the speed benefit will be drastic when compared to filtering a single table.

The number of partitions an RDD has determines the parallelism that Spark can achieve when processing it. This means that Spark can run one concurrent task for every partition your RDD has. Whilst you may be using a 20 core cluster, if your DataFrame only exists as one partition, your processing speed will be no better than if the processing was performed by a single machine and Spark’s speed benefits will not be observed.

1.3 Practical Usage

This idea can be confusing at first and requires a switch in mindset to one of distributed computing. By switching your mindset it can be easy to see why some operations may be taking much longer than usual. A good example of this is the difference between narrow and wide transformations. A narrow transformation is one in which a single input partition maps to a single output partition for example a .filter()/.where() in which each partition is searched for given criteria and will at most output a single partition.

Figure 2 - Narrow transformation mapping (image by the author)
Figure 2 – Narrow transformation mapping (image by the author)

A wide transformation is a much more expensive operation and is sometimes referred to as a shuffle in Spark. A shuffle goes against the ethos of Spark which is that moving data should be avoided at all costs as this is the most time consuming and expensive aspect of any data processing. However, it is obviously necessary for many instances to do a wide transformation such as when performing a .groupBy()or a join.

Figure 3— Wide transformation mapping (image by the author)
Figure 3— Wide transformation mapping (image by the author)

In a narrow transformation, Spark will perform what is known as pipelining meaning that if multiple filters are applied to the DataFrame then these will all be performed in memory. This is not possible for wide transformations and means that results will be written to disk causing the operation to be much slower.

This concept forces you to think carefully about how to achieve different outcomes with the data you are working with and how to most efficiently transform data without adding unnecessary overhead.

There are also some practical ways in which you can use partitioning to your benefit as well. These include .partitionBy() and .repartition() (this article explains both). By controlling the size and form of the partitions used in a table, operation speeds can exponentially increase (think indexes in SQL). Both of these operations add overhead to your processes, but by partitioning on a given column or set of columns, filters can become a lot quicker. This is most beneficial if you know that a certain column is going to be used extensively for filtering.


2. Spark is Lazy… Really Lazy!

2.1 The Problem

The feature of Spark that is definitely the most frustrating as a new user is Spark’s lazy evaluation as it goes against everything you have previously taken for granted in programming. Many developers have spent many hours in a code editor, setting breakpoints and stepping through code to understand what is happening at each step as the logical order of code progresses. Similarly, in a Jupyter Notebook, it is easy to run each cell and know the exact state of variables and whether processes have been successful or not.

The issue with this is that whilst when you call a certain transformation on a Pandas DataFrame in Jupyter, this is carried out instantaneously and the transformed variable sits there in memory ready to be accessed by the user. Conversely in Spark transformations are not applied as soon as they are called; instead, the transformations are saved and a plan of transformations is built up ready to be applied only when they are required.

To the new user, this can lead to the confusing scenarios of:

  • Complex operations in a DataBricks cell taking only a matter of milliseconds.
  • Code exiting with errors at unexpected points.

2.2 The Theory

This feature is known as lazy evaluation, and whilst this feature of Spark is hard to get used to, it is actually one of the key design features which makes Spark so fast and boasts 100x speed over technologies like Hadoop. Moving data is computationally expensive, and if after each transformation the intermediate table had to be written to disk then the overall process would take a long time – especially with large tables.

In Spark, there are two different types of operations that can be called: transformations and actions. Transformations are as the name suggests, any transformations that can be applied to DataFrames which modify it in some way to present that data in a different form. What is different about Spark is that when these transformations occur, rather than the necessary computation actually being applied, Spark is building up an optimized Physical Plan ready to execute your code when an action is called. An action is a command such as a .count(), .collect() or .save()which actually requires the data to be computed.

The process is as follows:

  1. Write DataFrame/Dataset/SQL Code.
  2. If this code is valid then, code is converted to a Logical Plan (logical operations).
  3. Spark transforms this Logical Plan to a Physical Plan (how these operations will be carried out on the cluster).
  4. Spark then executes this Physical Plan (RDD manipulations) on the cluster (when an action is called).

This process allows Spark to perform optimisations within the plan before any code is actually run. This includes operations such as predicate pushdown which is where a filter that is applied at the end of the set of transformations in code, is pushed to the front of the physical plan to ensure that transformations are being applied on a smaller set of data and is therefore faster.

2.3 Practical Usage

In practice, this problem manifests itself quite often in simple operations causing large hard to digest errors upon running often after a long list of transformations has been applied to a DataFrame or RDD. This makes Spark notoriously hard to debug for new users as it can be very hard to identify which exact operation has caused a pipeline of operations to fail, as typical debugging methods like print statements and breakpoints lose all meaning when the code they intersect has not actually been executed.

Figure 4 - Example Process (image by the author)
Figure 4 – Example Process (image by the author)

Take Figure 4 for example. In this simple pipeline, an aggregate function is applied to test_df and then joined to a second df (transformations), before a collect action called. As previously discussed, the transformations will be used to generate a Logical (and then Physical) Plan which will be executed when required. At stage 1, however, the transformed DataFrame does not exist – it is merely an idea of what you want the data to look like; this means that any prints or success checks applied at this stage are actually not testing whether the code is successful, they just assert that the logical plan has been created.

The same is true of _newdf as again the join to this table is just another step in the logical plan – so again at this stage, you cannot say for certain whether the code is successful although it may appear as such in Databricks. This idea also explains why joining two tables with millions (or more) rows takes only a second in a Databricks cell: because it is merely adding this step to the plan.

When we get to stage 3 however and call a collect the physical plan created is executed on the cluster and all of the desired operations will occur. It is at this point that if any of the actual operations are invalid, errors will be thrown.

Remember the logical plan only checks if code is valid, the execution of the physical plan will reveal errors in your operations.

In the simple example above you can probably see how it would be easy to trace the error back to the source, however, in lengthy pipelines, this can become convoluted especially as the physical plan may not follow the same order as the logical one.

To overcome this problem and debug effectively it is necessary to isolate operations and test thoroughly before creating huge pipelines as this will just cause issues down the line when you begin to read and write the data. Further to this, it can be highly beneficial to generate small test datasets to test the expected behaviour of functions and operations. If you are having to read in huge tables and collect these to the driver in order to test your functionality this will increase your debugging time drastically.


Conclusion

In conclusion, Spark is an amazing tool that has made data processing at scale much quicker and simpler than ever before. Despite it requiring a change of mindset to leverage it properly, it is definitely worth gaining a deeper understanding of its inner workings if for no other reason than realising its ingenious design.

With Spark it is definitely useful to understand how it is working under the hood as it is so different from any other technologies in use. With Databricks providing such a low barrier to entry when using Spark, it is easy to start using bad practices early on causing large cluster bills and long run times. But with some education its potential can be truly exploited and will lead to huge improvements in efficiency and performance.


Related Articles