The world’s leading publication for data science, AI, and ML professionals.

Delta Lake – Partitioning, Z-Order and Liquid Clustering

How are different partitioning/clustering methods implemented in Delta? How do they work in practice?

Photo by frame harirak on Unsplash
Photo by frame harirak on Unsplash

One of the issues that make Big Data difficult has always been in its name, it is Big. Partitioning, especially when done well, has always been a way to improve query execution times on vast amounts of data by reducing the data that needs to be read to a subset. However, partitioning data is complex and requires careful thought and some upfront planning, as what fits today’s requirements might not fit future ones. For instance, in Hive-style partitioning, columns might need to be changed or even increase their cardinality and make data over-partitioned (small-files problem), requiring a complete restructuring of the data that is not ideal at all.

Z-Order clustering is another technique that is used for data-skipping, also avoiding full data scans. However, this technique has some limitations. One is that newly ingested data is not ordered by default and the users are required to recluster it, which means that already clustered data will be reclustered and rewritten, increasing the time spent on the operation. Z-Order users also need to define the clustering columns each time they run the command as they are not part of any table properties.

This is where Liquid Clustering enters the game. The premise is that it can fit seamlessly into the current layout of the data and also be able to adapt to future needs without needing to rewrite any already clustered data.

In this post, we will explain the details of different data pruning strategies in Delta and how are they applied.

Partition pruning – Hive-style partitioning

Hive-style partitioning - Image by author
Hive-style partitioning – Image by author

Hive-style partitioning is a way of organizing a table into small chunks. These chunks of data are organized into several subfolders that contain the data for the partition value.

dbfs://people10m/gender=M/data_0.json
dbfs://people10m/gender=M/data_1.json
dbfs://people10m/gender=F/data_0.json
dbfs://people10m/gender=F/data_1.json

This method is not native to Delta, that is, is not part of the Delta protocol. However, since Delta is built upon Apache Spark, the old Hive-style partitioning is also an option that can work well in some scenarios.

Several mechanisms deal with this type of partitioning in a way that makes it entirely invisible to the end user. In Apache Spark, when a user reads the dataset, the gender column is automatically added to the schema with the respective value and can be queried just like a regular column. This technique is called partition discovery and it is handled by DataSource’s resolveRelation, which infers the partition columns from the given base paths. On the other hand, when a user saves a DataFrame while using partitionBy, an InsertIntoHadoopFsRelationCommand is executed as a part of the execution plan which calls FileFormatWriter that will spawn a write job (excluding the partition columns from its final schema and creating buckets for them) for each partition of the underlying RDD.

In the above example, since the query only selects data for which the gender is F, it will only need to physically scan that folder, resulting in effective data-skipping as it only reads half of the files of the dataset. This is called partition pruning.

There are some downsides to this approach, especially when partition columns with very high cardinality are chosen or multiple partition levels that will result in many small files and consequently worse read performance. In addition to this, once this partitioning strategy is defined, it cannot be changed without rewriting all the data as it is defined at the physical level.

I/O pruning – Z-Order

Another technique that is used for effective data-skipping is filtering on file-level statistics. In this technique, each file has available statistics that can be used as an indicator of whether or not the file is worth reading. By default, Delta stores statistics on minimum, maximum, and null counts for the first 32 columns.

Let’s take for instance a single column, id of the people10m public _ dataset. If we use repartitionByRange_ to order the data in 5 different files on that column, the min/max statistics distribution might look similar to this:

Files after range partitioning by column Id - Image by Author
Files after range partitioning by column Id – Image by Author
Select the first 20,000 employees of the company - Image by Author
Select the first 20,000 employees of the company – Image by Author

Running the above query will result in a good plan given that our query only filters that column and all the files contain disjoint sets of IDs. This way it is easier for the engine to pick the correct files to scan without any false positives.

What if we want to add another column to the query?

Let’s assume that we also want to filter on the salary of the employees.

Select the first 20,000 employees of the company with a salary greater than 40,000 - Image by Author
Select the first 20,000 employees of the company with a salary greater than 40,000 – Image by Author

After we range partition our files on both columns we end up with something like this:

Files after range partitioning by column Id and salary - Image by Author
Files after range partitioning by column Id and salary – Image by Author

Salaries have no direct relation to IDs, organizing the files in a way that makes data-skipping effective using the previous linear method will result in data only being sorted by the first column. By simply filtering for salaries greater than 40,000 we end up reading all five files instead of just one.

How can we solve this? Is there any way we can group multiple statistics in a single dimension while preserving locality so that our range partitioning just works?

If you guessed Z-Ordering, you guessed right. If you guessed space-filling curves you were even more right!

What is a space-filling curve, should I care about it? A space-filling curve is a curve that traverses all the points in an embedding space. Some curves are able to map these higher dimensional points into a single dimension while maintaining proximity in the original space. Sounds complex? It isn’t. Below we’ll give a bit more detail on how these curves work.

Z-Order curve

Z-Order curves were the first implementation of space-filling curves clustering in Delta, hence the operation name.

Level 1 Z-Order curve - Image by author
Level 1 Z-Order curve – Image by author

Z-Order values, the points that form the curve in the shape of a Z, are computed using a technique called bit interleaving. Bit interleaving is a way of representing an N-dimensional coordinate using bits. For instance, if we use a 4-bit representation (0000 to 1111) we are able to encode a coordinate 4×4 grid by taking each bit and assigning it to an axis at a time. Below we’ll go through a more visual example of this technique.

In Delta, Z-Ordering is used to group data in a way that makes data-skipping effective. All Z-Order columns are "marked" to be range-partitioned using RangePartitionId expression. This expression is just a placeholder that will be handled by an optimizer which will sample the RDD to find the range boundaries for the columns. (If you ever tried to Z-Order a decent-sized dataset more than one time you probably noticed that its file statistics are not deterministic. That is because Delta uses reservoir sampling to avoid reading the whole dataset when calculating range IDs). Then, all the computed ranges are converted to bytes and interleaved, which results in the row Z-Order value.

Below we’ll illustrate how Z-Order works in Delta, in a simplified way, for a group of 6 records and 3 partitions.

Z-Order optimization for 6 records to 3 different range IDs - Image by author
Z-Order optimization for 6 records to 3 different range IDs – Image by author

Hilbert curve

The better a curve is at preserving locality, the fewer files we’ll have to read due to false positives. That is the reason why the Hilbert curve is more commonly used in scenarios where locality preservation is essential.

At the time of writing, Hilbert curves are not implemented in the open-source version of Delta. However, they are the default curve used by Databricks Z-Order implementation since they provide better data locality for higher dimensional data compared to Z-Order curves.

Hilbert curve— Image by author
Hilbert curve— Image by author

The Hilbert curve can appear in four different ways, each one derived from the above with a rotation of 90º.

But why is the Hilbert curve better at preserving locality than the default Z-Order curve?

Hilbert curve’s adjacent points always have a distance of 1. Unlike Z-Order, which means that these jumps might generate Z-Ordered files with a large min/max difference, which will make it useless.

Distance between two adjacent points on Z-Order curve - Image by author
Distance between two adjacent points on Z-Order curve – Image by author

There are several implementations of the algorithm, but, in this post, I’ll cover a neat iterative approach from John Skilling in "Programming the Hilbert curve". This algorithm might be confusing as it contains some bit manipulations. Feel free to skip to the next section if you don’t need to understand the details.

Keep in mind that since the Databricks code is proprietary, the following examples might not represent the current implementation.

The J. Skilling encoding method interleaves the bits and encodes them using Gray Code. This way, only one bit is changed at a time so traversing the grid will be only in vertical or horizontal directions. Then, it traverses the encoded bits and applies a series of bit exchanges and inversions which will return the bit representation of the coordinates, which can be retrieved by deinterleaving them.

Skilling transform which transforms cartesian points to a Hilbert index - From Programming the Hilbert curve
Skilling transform which transforms cartesian points to a Hilbert index – From Programming the Hilbert curve

Similarly to Z-Order, what we need is a way of encoding a group of coordinates with an arbitrary dimension to a single point. In order to achieve that, we’ll be running the previous algorithm, but backward, so that we can retrieve the point in the Hilbert curve. Then there are two cycles, one that will iterate over the encoded bits, from the most to the least significant until p-2 where p is the number of bits in each axis, and one inner cycle that will iterate from the least significant bit until n-1 where n is the number of dimensions. Depending on our current bit we will have to either exchange bits or invert them. Finally, we’ll have to Gray decode the bits and we’ll get our point.

Below we’ll go through how to encode the coordinates (2, 0), which represent the point number 14 in the Hilbert curve.

Algorithm used to transform cartesian coordinates into Hilbert curve points - Image by author
Algorithm used to transform cartesian coordinates into Hilbert curve points – Image by author
4x4 Hilbert curve - Image by Author
4×4 Hilbert curve – Image by Author

From here, we’ll assume the process is the same as the Z-Order implementation where the data is range partitioned and close records are written in the same file.

Liquid clustering

So, what is Liquid Clustering after all? It is not more than Hilbert Curves with a new feature called ZCube that enables incremental clustering!

The OPTIMIZE ZORDER BY command requires data to be fully rewritten, which is very expensive for large tables. Also, when there is an issue in the middle of an OPTIMIZE ZORDER command, everything needs to start from the beginning making it very cumbersome sometimes.

What are ZCubes?

ZCubes are groups of files that were produced by the same OPTIMIZE job. This way, an OPTIMIZE job of a huge table can be split into several different jobs that will generate a new ZCube, and a new entry in the delta log in order to enable incremental clustering. Each newly optimized file will contain a _ZCUBEID property in the AddFile metadata that will make it possible to distinguish between optimized and non-optimized files (ones that don’t have an associated ZCube).

There are two new configurable ZCube properties:

  • _MIN_ZCUBE_SIZE_ sets the minimum size of a ZCUBE. ZCubes under this size will be considered as part of OPTIMIZE jobs and new files can be merged until the size reaches this threshold (defaults 100GB). These cubes are called partial ZCubes.
  • _TARGET_CUBE_SIZE_ sets the target size for a finished cube, that contains files that exceed the target size. These cubes are called stable ZCubes.

Stable ZCubes can become partial ZCubes again if Delete commands end up invalidating a number of files that make them less than _MIN_ZCUBESIZE.

How does it adapt to new partition columns seamlessly?

When users change clustering columns, only ZCubes that contain the same clustering columns are considered for optimization. The other cubes stay untouched and new ones are created.

How does it work in practice?

When an OPTIMIZE table command is issued, Delta picks the files that are valid for ZCube generation, which are files that are part of a partial ZCube (that can be further optimized), and the new files. Then, a planning step takes part, which packs the files under several ZCubes that are OPTIMIZE jobs that will run independently from each other.

OPTIMIZE flow with Liquid Clustering enabled - Image by Author
OPTIMIZE flow with Liquid Clustering enabled – Image by Author

How can I enable/disable Liquid Clustering?

--New tables
CREATE TABLE <table>
USING delta
CLUSTER BY (<col1>, <col2>, ...)

--Existing tables
ALTER TABLE <table>
CLUSTER BY (<col1>, <col2>, ...)

--Remove liquid clustering
ALTER TABLE <table>
CLUSTER BY NONE

Since clustering columns are defined at the table level, the OPTIMIZE command doesn’t need to define any parameters.


Note: This is still under proposal and might be subject to changes.

Conclusion

In this blog post, we went through all the details of the different partitioning and clustering alternatives available in Delta Lake. We went through Hive Style partitioning, Z-Order, and their current issues to show how Liquid Clustering is able to solve them.

Liquid clustering is very promising as it is easier to use, has incremental and better clustering performance, and supports changes in partition columns without any overhead. There are several performance comparisons available out there if you are curious about performance, and you can already try it if you are using Databricks Runtime 13.3+. Databrick’s recommendation is to change all the current partition columns and ZOrder columns to clustering columns for better performance.

If you are using open-source Delta, while Liquid Clustering is not available, make sure to check my previous post on how to keep your tables fast and clean:

Delta Lake— Keeping it fast and clean

References

https://docs.databricks.com/en/delta/clustering.html

https://docs.google.com/document/d/e/2PACX-1vREkVPDxqlKrwnaQ7Et1EnaiCF-VhFXCwit7bGSomWKtGEfkxbuGhX4GP3cJ20LgllYfjzsjr2lyY5y/pub#kix.301alpimymwh

https://pubs.aip.org/aip/acp/article-abstract/707/1/381/719611/Programming-the-Hilbert-curve

https://en.wikipedia.org/wiki/Z-order_curve

https://en.wikipedia.org/wiki/Hilbert_curve


Related Articles