The world’s leading publication for data science, AI, and ML professionals.

Databricks Delta Lake – Database on top of a Data Lake – Part 2

Part 2 of 2 – Understanding the Basics of Databricks Delta Lake – Partitioning, Schema Evolution, Data Lineage & Vacuum

Image by Gerd Altmann from Pixabay
Image by Gerd Altmann from Pixabay

In Part 1 we explored how Delta Lake features like ACID Transactions, Checkpoints, Transaction Log & Time Travel can positively impact change Data capture, processing and management. In this article, we will continue to advance our understanding of some advanced features like Partitioning, Schema Evolution, Data Lineage & Vacuum

Data Provenance/Data Lineage

Going from ingestion to curation and transformation data goes through a journey. This journey is termed ad Data Lineage. These days regulatory agencies these days impose very strict guidelines for tracing and auditing data. Therefore validating the Data Lineage is a critical activity in fulfilling compliance and governance requirements.

Delta Lake stores data lineage information for each writes to a table for 30 days.

$ git clone https://github.com/mkukreja1/blogs.git

The complete notebook is available at /delta_lake/delta_lake-demo-2.ipynb . Let me run through each step below with explanations:

deltaTable = DeltaTable.forPath(spark, "hdfs:///delta_lake/products")
df_history = deltaTable.history()
df_history.show(20, False)
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp              |userId|userName|operation             |operationParameters                                                      |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                           |userMetadata|
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|12     |2020-09-01 16:48:45.792|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 270)]                                     |null|null    |null     |11         |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|11     |2020-09-01 16:48:43.906|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 280)]                                     |null|null    |null     |10         |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|10     |2020-09-01 16:48:41.15 |null  |null    |UPDATE                |[predicate -> (ProductID#529 = 260)]                                     |null|null    |null     |9          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|9      |2020-09-01 16:48:39.497|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 200)]                                     |null|null    |null     |8          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|8      |2020-09-01 16:48:37.695|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 240)]                                     |null|null    |null     |7          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|7      |2020-09-01 16:48:35.437|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 220)]                                     |null|null    |null     |6          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|6      |2020-09-01 16:48:33.499|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 250)]                                     |null|null    |null     |5          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|5      |2020-09-01 16:48:31.559|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 210)]                                     |null|null    |null     |4          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|4      |2020-09-01 16:48:29.492|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 230)]                                     |null|null    |null     |3          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]                                                                                                                        |null        |
|3      |2020-09-01 16:48:26.544|null  |null    |MERGE                 |[predicate -> (products.`ProductID` = products_new.`ProductID`)]         |null|null    |null     |2          |null          |false        |[numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 10, numTargetRowsInserted -> 5, numTargetRowsUpdated -> 4, numOutputRows -> 9, numSourceRows -> 9, numTargetFilesRemoved -> 1]|null        |
|2      |2020-09-01 16:48:19.493|null  |null    |DELETE                |[predicate -> ["(`ProductID` = 210)"]]                                   |null|null    |null     |1          |null          |false        |[numRemovedFiles -> 1, numDeletedRows -> 1, numAddedFiles -> 1, numCopiedRows -> 4]                                                                                                                        |null        |
|1      |2020-09-01 16:48:12.635|null  |null    |UPDATE                |[predicate -> (ProductID#529 = 200)]                                     |null|null    |null     |0          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 4]                                                                                                                        |null        |
|0      |2020-09-01 16:47:31.819|null  |null    |CREATE TABLE AS SELECT|[isManaged -> false, description ->, partitionBy -> [], properties -> {}]|null|null    |null     |null       |null          |true         |[numFiles -> 1, numOutputBytes -> 1027, numOutputRows -> 5]                                                                                                                                                |null        |
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+

No need to fear data audits because every change made to data is backed up by a full audit trail of the changes.

A simple look at the last operation on a table is as follows:

df_lastOperation = deltaTable.history(1)
df_lastOperation.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|     12|2020-09-01 16:48:...|  null|    null|   UPDATE|[predicate -> (Pr...|null|    null|     null|         11|          null|        false|[numRemovedFiles ...|        null|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+

Schema Evolution – Detecting and Adapting to Schema Changes

People who have been creating data pipelines will surely relate to this issue. We create and deploy our pipelines with a given schema usually supplied to us at the start of the project. Everything works fine for a while until one day the pipelines error out. Turns out that the schema for the incoming files has changed. Personally, I have been hit by the same issue many times. At my very first hit, there was extensive damage done to the data because we were not diligent enough to catch it before it messed up the data. I still remember that we had to spend endless hours dealing with the damage – fixing code and back tracking data.

I learnt an important lesson that day – VALIDATE SCHEMA BEFORE INGESTING DATA. It’s a topic for another post in the future. For now, let’s focus on how Delta Lake can help.

Delta Lake can safeguard data by enforcing schema validation at the time of writes. What does that mean?

  1. It means that at write time the schema of the newly incoming data is compared to past data.
  2. If a discrepancy is found the transaction is cancelled – no data is written to the storage-exception is raised for the user. Now let’s see it in action. Here is the last file we had ingested previously.
products_aug21.csv
products_aug21.csv

And this is the new file from the next day. Notice the schema of the file has changed – a new column Quantity has been added.

products_aug22.csv
products_aug22.csv

Lets review the pypark code:

df_productsaug22 = spark.read.csv('hdfs:///delta_lake/raw/products_aug22.csv', header=True, inferSchema=True)
df_productsaug22.show()
deltaTable = DeltaTable.forPath(spark, "hdfs:///delta_lake/products")
df_productsaug22.write.format("delta").mode("append").save("hdfs:///delta_lake/products")
AnalysisException                         Traceback (most recent call last)
<ipython-input-15-85affcb142df> in <module>
----> 1 df_productsaug22.write.format("delta").mode("append").save("hdfs:///delta_lake/products")

/opt/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    825             self._jwrite.save()
    826         else:
--> 827             self._jwrite.save(path)
    828 
    829     @since(1.4)

~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/opt/spark/python/pyspark/sql/utils.py in raise_from(e)

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 320f5591-72dd-4f4c-bdac-38f560e90dba).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- ProductID: integer (nullable = true)
-- Date: string (nullable = true)
-- Price: double (nullable = true)

Data schema:
root
-- ProductID: integer (nullable = true)
-- Date: string (nullable = true)
-- Price: double (nullable = true)
-- Quantity: integer (nullable = true)

         ;

Delta Lake promptly rejects the new file because of a schema mismatch. Pretty cool.

What if you were cool with the new column i.e. you want to ingest the data even if there is a schema mismatch. Go ahead as below using option("mergeSchema", "true").

df_productsaug22.write.format("delta").mode("append").option("mergeSchema", "true").save("hdfs:///delta_lake/products")
spark.table("products").show()
+---------+----------+------+--------+
|ProductID|      Date| Price|Quantity|
+---------+----------+------+--------+
|      200|2020-08-22|  25.5|       2|
|      210|2020-08-22|  46.0|       5|
|      220|2020-08-22| 34.56|       6|
|      230|2020-08-22| 23.67|      11|
|      200|2020-08-20|  25.5|    null|
|      250|2020-08-21| 99.76|    null|
|      230|2020-08-20| 23.67|    null|
|      210|2020-08-21|  46.0|    null|
|      220|2020-08-20| 34.56|    null|
|      260|2020-08-21| 64.55|    null|
|      280|2020-08-21| 54.78|    null|
|      270|2020-08-21|106.32|    null|
|      240|2020-08-20|100.82|    null|
+---------+----------+------+--------+

The new column is now part of Delta Lake metadata. Notice that the Quantity data for previous data (< 2020–08–22) has been set to null.

Partitioning

To appreciably improve the performance of queries in Delta Lake you should consider creating a table that is partitioned by a column. Choosing the right partitioning column is very important.

  1. Choose a column that has low cardinality like date, definitely not sequential ID columns
  2. Choose column that are even sized and have large data size hopefully > 1 GB
df_productsaug_partition = spark.read.csv('hdfs:///delta_lake/raw/*.csv', header=True, inferSchema=True)
df_productsaug_partition.write.format("delta").partitionBy("Date").option("path", "hdfs:///delta_lake/products_p").saveAsTable("products_p")
$ hadoop fs -ls /delta_lake/products_p
Found 4 items
drwxr-xr-x   - mkukreja supergroup          0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-20
drwxr-xr-x   - mkukreja supergroup          0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-21
drwxr-xr-x   - mkukreja supergroup          0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-22
drwxr-xr-x   - mkukreja supergroup          0 2020-09-01 17:19 /delta_lake/products_p/_delta_log

Notice that Delta Lake has created a partitioned folder structure based on the Date column.

Vacuum

Delta Lake implements data versioning so that it can supply the old version of data on demand. Storing multiple versions of the same data could get expensive over time. Therefore, Delta Lake includes a cleanup mechanism called vacuum that deletes old versions of data.

deltaTable.vacuum()

I hope this article was helpful. Delta Lake is covered as part of the Big Data Hadoop, Spark & Kafka course offered by Datafence Cloud Academy. The course is taught online by myself on weekends.


Related Articles