Data in an enterprise environment traditionally grows infinitely over time. For example:
- Transactions in an online store accumulate at a fast pace.
- Log data for an application running on a server.
- Clickstream data for millions of online shoppers.
- IoT sensor data from hundreds of interconnected devices.
In all the above cases, the commonality is that data accumulates and scales in volume over time and it needs to be processed wisely.
What is Incremental Data Processing?
On a high-level, incremental data processing is the movement of new or fresh data from a source to a destination.
Incremental Etl is one of the most important patterns in a data lake architecture. Reliably loading new data reduces the cost and helps scale data pipelines efficiently.
Most data pipelines these days start with some form of file ingestion process from cloud storage.
Historically, ingesting files in a data lake has been a complicated process since many systems are configured to process all files in a directory or engineers need to implement custom logic to identify new data.
Often the custom logic is quite complex and the de-facto method of reprocessing all data is quite expensive.
What if you could efficiently load new data without having to spend a ton of money to read all data or conjure up your own processing framework? It is now possible.
What is Databricks Auto Loader?
An out-of-the-box solution that allows you to incrementally load new data files from your cloud storage without the need for any complex setup.
Under the hood, Autoloader uses the Spark Structured Streaming API in order to read new data from a source type called cloudFiles.
If you are familiar with Spark read and write operations, Auto Loader implementation is very intuitive.
Using only the following 4 arguments, one can easily configure and set up an incremental data processing pipeline using Auto Loader.
- data source – Directory of the source data. Auto Loader will automatically ingest files that arrive in this location.
- source format – cloudFiles should be the default format for Auto Loader ingestion, you can also specify an option cloudFiles.format to indicate the format of the source file itself.
- table – the destination table for ingesting the source data. It can be specified using .table() argument
- checkpoint directory – argument to let the Auto Loader know where to store the checkpoints. Checkpoints are like a bookkeeping service of Spark streaming that keeps track of the streaming progress and also can be used to keep track of the schema of the dataset.
How to ingest data incrementally using Auto Loader?
In order to implement the following, all you need is a Databricks workspace and a cluster to run your notebook.
Let’s get acquainted with the sample flight delays CSV data that we will use for this tutorial. You can find 3 CSV files for flight delays for 3 different dates, curated in the GitHub repository along with a follow-along notebook.
The dataset has the following schema:
The Scenario
Let’s now imagine, you are a Data Engineer responsible to create and maintain a data lake table that stores the flight delays. Each day the source system uploads the collective flight delays from the previous day into a cloud storage directory. Your task is to process this data on a daily basis in an optimal way.
The Design
The schematic below represents the implementation accurately. As they often say a picture speaks louder than words, in Data Engineering a visual representation usually puts things into perspective much better than a whole page of text.
I like to call this the pseudo-code of Data Engineering!
As you can see in the illustration above, this solution may be used to represent any generic file ingestion scenario where data is accumulated over time in a data lake for further use.
Implementation
In order to follow along and implement this tutorial, you will need to download the CSV files from the git repo as we try to simulate the behavior of the source system.
In order to keep the example simple and easy to replicate, we will not use any kind of cloud storage solution for this tutorial and stick to the built-in Databricks file system(dbfs) for our purpose.
Ideally, in a production setup, the source system sends files to some kind of landing/staging cloud storage directory.
Let’s turn the pseudo-code into actual implementation.
Prepare an empty notebook with the code above. The first step of the ETL process is to read the data from the source directory. The syntax is identical to a traditional spark read with a few minor tweaks.
Extract – We read the data as a stream because Auto Loader makes use of Spark Streaming under the hood. Notice that we need to specify the cloudFiles as the format to imply that we want to use the Auto Loader functionality. As discussed earlier we also need to provide a checkpoint directory to serve as a storage location for the bookkeeping functionality.
Transform – An ETL without a transformation is boring. The Spark schema inference reads the first column in our dataset as a timestamp
. Therefore we can apply a simple transformation to convert the timestamp column to date type.
Load – For the load stage, we simply call the Spark writeStream method and pass the checkpoint location and the target table name where we intend our data to be stored. The target table is a delta table. The execution will create a delta table called DailyDelays if it does not exist.
Now that the ETL code is ready, let’s begin the show. In order to simulate a source system uploading files to a source directory, let’s upload our first CSV file(that can be found in the git repo) using the file upload option available in the workspace itself. You can find this in the Workspace Menu → Data → Create Table option. Remember to add a suffix to the default path of /FileStore/tables/ as shown below and upload the first file.
Once the file is uploaded run the cell with the ETL code, if everything goes well you will start to notice the streaming query being activated like below. This means that the first CSV file is already being processed.
To confirm if data is successfully ingested query the target delta table like below in a different notebook and you should already see some fantastic results.
Now upload the 2nd CSV file into the same directory and watch the magic unfold. Without the need to adjust/update any part of the code, the streaming query immediately picks up this new file for processing and ingests it into the same target table. You can obviously confirm this by querying the target table. As mentioned earlier each file contains only delays for a particular day. If you now find 2 different dates in the table Auto Loader has already worked its magic.
If you still can’t believe it, let’s do one more as a sanity check, upload the 3rd file and within a few seconds, our target table should now have data from 3 different dates.
Note: If you are wondering why the cell executing the code is continuously running and never stops this is because it runs an active Streaming query. A Spark streaming query has a default trigger that runs every 500ms which equates to continuous processing of the data, as it arrives. This is very helpful when you expect low latency and near real-time data ingestion. Be aware that this means your cluster needs to be always on.
Of course, every case is different, and in our example, we did say that data arrives only once per day and it does not make any sense to have this query running all the time, it might be optimal to run this once a day but still process only the new files that arrived since the previous day.
This can be achieved with a simple change to our existing writeStream logic. By introducing the trigger once method we can still make use of this powerful functionality so that the query is executed only once and then automatically stops.
Conclusion
Incremental data processing is inevitably becoming the go-to method for most data pipelines and Auto Loader along with Spark streaming if used wisely can serve this purpose perfectly in all stages of a data pipeline. It’s not a surprise that many vendors are providing this as a built-in functionality recognizing the inherent need for an optimal data processing method.
It has several advantages:
- Faster processing because there is less data.
- Consistent performance since only changed data is processed.
- Needless to say, it directly impacts the cost of resources needed to process data.
Go ahead and give Auto Loader a chance and I can guarantee you will never look back!
The CSV files along with the helper notebook can be found in the git repo.