
In Part 1 we explored how Delta Lake features like ACID Transactions, Checkpoints, Transaction Log & Time Travel can positively impact change Data capture, processing and management. In this article, we will continue to advance our understanding of some advanced features like Partitioning, Schema Evolution, Data Lineage & Vacuum
Data Provenance/Data Lineage
Going from ingestion to curation and transformation data goes through a journey. This journey is termed ad Data Lineage. These days regulatory agencies these days impose very strict guidelines for tracing and auditing data. Therefore validating the Data Lineage is a critical activity in fulfilling compliance and governance requirements.
Delta Lake stores data lineage information for each writes to a table for 30 days.
$ git clone https://github.com/mkukreja1/blogs.git
The complete notebook is available at /delta_lake/delta_lake-demo-2.ipynb . Let me run through each step below with explanations:
deltaTable = DeltaTable.forPath(spark, "hdfs:///delta_lake/products")
df_history = deltaTable.history()
df_history.show(20, False)
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|12 |2020-09-01 16:48:45.792|null |null |UPDATE |[predicate -> (ProductID#529 = 270)] |null|null |null |11 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|11 |2020-09-01 16:48:43.906|null |null |UPDATE |[predicate -> (ProductID#529 = 280)] |null|null |null |10 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|10 |2020-09-01 16:48:41.15 |null |null |UPDATE |[predicate -> (ProductID#529 = 260)] |null|null |null |9 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|9 |2020-09-01 16:48:39.497|null |null |UPDATE |[predicate -> (ProductID#529 = 200)] |null|null |null |8 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|8 |2020-09-01 16:48:37.695|null |null |UPDATE |[predicate -> (ProductID#529 = 240)] |null|null |null |7 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|7 |2020-09-01 16:48:35.437|null |null |UPDATE |[predicate -> (ProductID#529 = 220)] |null|null |null |6 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|6 |2020-09-01 16:48:33.499|null |null |UPDATE |[predicate -> (ProductID#529 = 250)] |null|null |null |5 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|5 |2020-09-01 16:48:31.559|null |null |UPDATE |[predicate -> (ProductID#529 = 210)] |null|null |null |4 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|4 |2020-09-01 16:48:29.492|null |null |UPDATE |[predicate -> (ProductID#529 = 230)] |null|null |null |3 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0] |null |
|3 |2020-09-01 16:48:26.544|null |null |MERGE |[predicate -> (products.`ProductID` = products_new.`ProductID`)] |null|null |null |2 |null |false |[numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 10, numTargetRowsInserted -> 5, numTargetRowsUpdated -> 4, numOutputRows -> 9, numSourceRows -> 9, numTargetFilesRemoved -> 1]|null |
|2 |2020-09-01 16:48:19.493|null |null |DELETE |[predicate -> ["(`ProductID` = 210)"]] |null|null |null |1 |null |false |[numRemovedFiles -> 1, numDeletedRows -> 1, numAddedFiles -> 1, numCopiedRows -> 4] |null |
|1 |2020-09-01 16:48:12.635|null |null |UPDATE |[predicate -> (ProductID#529 = 200)] |null|null |null |0 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 4] |null |
|0 |2020-09-01 16:47:31.819|null |null |CREATE TABLE AS SELECT|[isManaged -> false, description ->, partitionBy -> [], properties -> {}]|null|null |null |null |null |true |[numFiles -> 1, numOutputBytes -> 1027, numOutputRows -> 5] |null |
+-------+-----------------------+------+--------+----------------------+-------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
No need to fear data audits because every change made to data is backed up by a full audit trail of the changes.
A simple look at the last operation on a table is as follows:
df_lastOperation = deltaTable.history(1)
df_lastOperation.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
| 12|2020-09-01 16:48:...| null| null| UPDATE|[predicate -> (Pr...|null| null| null| 11| null| false|[numRemovedFiles ...| null|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
Schema Evolution – Detecting and Adapting to Schema Changes
People who have been creating data pipelines will surely relate to this issue. We create and deploy our pipelines with a given schema usually supplied to us at the start of the project. Everything works fine for a while until one day the pipelines error out. Turns out that the schema for the incoming files has changed. Personally, I have been hit by the same issue many times. At my very first hit, there was extensive damage done to the data because we were not diligent enough to catch it before it messed up the data. I still remember that we had to spend endless hours dealing with the damage – fixing code and back tracking data.
I learnt an important lesson that day – VALIDATE SCHEMA BEFORE INGESTING DATA. It’s a topic for another post in the future. For now, let’s focus on how Delta Lake can help.
Delta Lake can safeguard data by enforcing schema validation at the time of writes. What does that mean?
- It means that at write time the schema of the newly incoming data is compared to past data.
- If a discrepancy is found the transaction is cancelled – no data is written to the storage-exception is raised for the user. Now let’s see it in action. Here is the last file we had ingested previously.

And this is the new file from the next day. Notice the schema of the file has changed – a new column Quantity has been added.

Lets review the pypark code:
df_productsaug22 = spark.read.csv('hdfs:///delta_lake/raw/products_aug22.csv', header=True, inferSchema=True)
df_productsaug22.show()
deltaTable = DeltaTable.forPath(spark, "hdfs:///delta_lake/products")
df_productsaug22.write.format("delta").mode("append").save("hdfs:///delta_lake/products")
AnalysisException Traceback (most recent call last)
<ipython-input-15-85affcb142df> in <module>
----> 1 df_productsaug22.write.format("delta").mode("append").save("hdfs:///delta_lake/products")
/opt/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
825 self._jwrite.save()
826 else:
--> 827 self._jwrite.save(path)
828
829 @since(1.4)
~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
/opt/spark/python/pyspark/sql/utils.py in raise_from(e)
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 320f5591-72dd-4f4c-bdac-38f560e90dba).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- ProductID: integer (nullable = true)
-- Date: string (nullable = true)
-- Price: double (nullable = true)
Data schema:
root
-- ProductID: integer (nullable = true)
-- Date: string (nullable = true)
-- Price: double (nullable = true)
-- Quantity: integer (nullable = true)
;
Delta Lake promptly rejects the new file because of a schema mismatch. Pretty cool.
What if you were cool with the new column i.e. you want to ingest the data even if there is a schema mismatch. Go ahead as below using option("mergeSchema", "true").
df_productsaug22.write.format("delta").mode("append").option("mergeSchema", "true").save("hdfs:///delta_lake/products")
spark.table("products").show()
+---------+----------+------+--------+
|ProductID| Date| Price|Quantity|
+---------+----------+------+--------+
| 200|2020-08-22| 25.5| 2|
| 210|2020-08-22| 46.0| 5|
| 220|2020-08-22| 34.56| 6|
| 230|2020-08-22| 23.67| 11|
| 200|2020-08-20| 25.5| null|
| 250|2020-08-21| 99.76| null|
| 230|2020-08-20| 23.67| null|
| 210|2020-08-21| 46.0| null|
| 220|2020-08-20| 34.56| null|
| 260|2020-08-21| 64.55| null|
| 280|2020-08-21| 54.78| null|
| 270|2020-08-21|106.32| null|
| 240|2020-08-20|100.82| null|
+---------+----------+------+--------+
The new column is now part of Delta Lake metadata. Notice that the Quantity data for previous data (< 2020–08–22) has been set to null.
Partitioning
To appreciably improve the performance of queries in Delta Lake you should consider creating a table that is partitioned by a column. Choosing the right partitioning column is very important.
- Choose a column that has low cardinality like date, definitely not sequential ID columns
- Choose column that are even sized and have large data size hopefully > 1 GB
df_productsaug_partition = spark.read.csv('hdfs:///delta_lake/raw/*.csv', header=True, inferSchema=True)
df_productsaug_partition.write.format("delta").partitionBy("Date").option("path", "hdfs:///delta_lake/products_p").saveAsTable("products_p")
$ hadoop fs -ls /delta_lake/products_p
Found 4 items
drwxr-xr-x - mkukreja supergroup 0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-20
drwxr-xr-x - mkukreja supergroup 0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-21
drwxr-xr-x - mkukreja supergroup 0 2020-09-01 17:19 /delta_lake/products_p/Date=2020-08-22
drwxr-xr-x - mkukreja supergroup 0 2020-09-01 17:19 /delta_lake/products_p/_delta_log
Notice that Delta Lake has created a partitioned folder structure based on the Date column.
Vacuum
Delta Lake implements data versioning so that it can supply the old version of data on demand. Storing multiple versions of the same data could get expensive over time. Therefore, Delta Lake includes a cleanup mechanism called vacuum that deletes old versions of data.
deltaTable.vacuum()
I hope this article was helpful. Delta Lake is covered as part of the Big Data Hadoop, Spark & Kafka course offered by Datafence Cloud Academy. The course is taught online by myself on weekends.