The world’s leading publication for data science, AI, and ML professionals.

An Efficient Implementation of DBSCAN on PySpark

DBSCAN is a well-known clustering algorithm that has stood the test of time. Though the algorithm is not included in Spark MLLib. There…

A new algorithm based on triangle inequality to efficiently compute distances and merge clusters using graph

DBSCAN is a well-known Clustering algorithm that has stood the test of time. Though the algorithm is not included in Spark MLLib. There are a few implementations (1, 2, 3) though they are in scala. Implementation in PySpark uses the cartesian product of rdd to itself which results in O(n²) complexity and possibly O(n²) memory before the filter.

ptsFullNeighborRDD=rdd.cartesian(rdd)                            .filter(lambda (pt1,pt2): dist(pt1,pt2)<eps)                            .map(lambda (pt1,pt2):(pt1,[pt2]))                            .reduceByKey(lambda pts1,pts2: pts1+pts2)                            .filter(lambda (pt, pts): len(pts)>=minPts)
source: https://github.com/htleeab/Dbscan-pyspark/blob/master/DBSCAN.py

For a quick primer on the complexity of the DBSCAN algorithm:

https://en.wikipedia.org/wiki/DBSCAN#Complexity

DBSCAN visits each point of the database, possibly multiple times (e.g., as candidates to different clusters). For practical considerations, however, the time complexity is mostly governed by the number of regionQuery invocations. DBSCAN executes exactly one such query for each point, and if an indexing structure is used that executes a neighborhood query in O(log n), an overall average runtime complexity of O(n log n) is obtained (if parameter ε is chosen in a meaningful way, i.e. such that on average only O(log n) points are returned). Without the use of an accelerating index structure, or on degenerated data (e.g. all points within a distance less than ε), the worst case run time complexity remains O(_n_²). The distance matrix of size (_n_²-n)/2 can be materialized to avoid distance recomputations, but this needs O(_n_²) memory, whereas a non-matrix based implementation of DBSCAN only needs O(n) memory.

In this post, we will explore how we can implement DBSCAN in PySpark efficiently without using O(n²) operations by reducing the number of distance calculations. We would implement an indexing/partitioning structure based on Triangle Inequality to achieve this.

Triangle Inequality

Let’s **** refresh triangle inequality. If there are three vertices of the triangle _ a,_ b and c, and given distance metric d. Then

d(a, b) d(a, c) + d(c, b)

d(a, c) d(a, b) + d(b, c)

d(b, c) d(b, a) + d(a, c)

In DBSCAN there is a parameter ε, which is used to find the linkage between points. Now, let us use this parameter to see if we can use triangle inequality to reduce the number of operations.

Let’s say there are four points x, y, z, and c.

Lemma 1: If d(x, c) ≥ (k+1)ε and d(y, c) < _k_ε then d(x, y) > ε

As per triangle inequality,

d(x, c) ≤ d(x, y) + d(y, c)

d(x, c)-d(y, c)≤ d(x, y)

d(x, c)-d(y, c) > (k+1)ε -_k_ε > ε

so d(x, y) > ε

Lemma 2: If d(x, c) ≤ _l_ε and d(z, c) > (_l+1)_ε then d(x, z) > ε

As per triangle inequality,

d(z, c) ≤ d(x, z) + d(x, c)

d(z, c)-d(x, c)≤ d(x, z)

d(z, c)-d(x, c) > (l+1)ε -_l_ε

d(z, c)-d(x, c) > ε

so d(x, z) > ε

What we can deduce from above is that if we compute distances of all points from c then we can filter points y and z using above criteria. We can compute distances from c and partition points in concentric rings (center being c).

Overlapping Concentric Ring Partitions

What should be the width of the partitions?

From the above lemmas, we can see that if

(m+1)ε ≥ d(x, c) ≥ _m_ε then we can filter out points y and z if d(y, c) < (m-1)ε and d(z, c) > (_m+2)_ε

From this, we can deduce that for any point for which (m+1)ε ≥ d(x, c) ≥ _m_ε is true we can have a partition of (m+3)ε width starting at (m-1)ε and ending at (_m+_2)ε.

Figure 1: What should be the width of partitions?
Figure 1: What should be the width of partitions?

Here is a depiction how it would look like. Two-dimensional space is divided into quantiles of ε euclidean distance. Green rings show the partition. _x_1 is at the (m+1/2)ε distance from c (center of (_m-1)_ε – (m+2)ε partition). _x_2 and _x_3 are at _m_ε and (m+1)ε distance from c. It is clear that for _x_1, _x_2, and _x_3 all points of relevance (within the circle of radius ε) are within the partition.

If we create mutually exclusive partitions and compute distances among points within that partition, it would be incomplete. For example, _x_4 and _x_5’s range circle will overlap two partitions. Hence the need for overlapping partition. One strategy could be to move the partition by ε. Though in that case if partition width is 3ε, then one point may occur in three different partitions. Instead, partitions are created of 2ε width and move them by ε. In this case, one point may occur only in two partitions.

Figure 2: 2ε width partition with ε overlap
Figure 2: 2ε width partition with ε overlap

The above two images show how this partition scheme works. Two partitions in combination allow a range query of ε radius for all the points from _m_ε to (_m+_1)ε. The first partition covers all the points from _m_ε to (_m+_1/2)ε (_x_2 covered but _x_3 not) and the second covers (_m+_1/2)ε to (_m+_1)ε (_x_3 covered but _x_2 not).

Partitioning Visualization

Let us see how these partitions look like on some generated data.

Figure 3: Data Points
Figure 3: Data Points

The above data and image are generated by the following code:

Figure 4: Few Partitions Generated from Data
Figure 4: Few Partitions Generated from Data

The above partitions from data are generated by the following code:

partition_index identifies each partition. Each data point is put into two partitions as discussed before based on the distance from c (pivot) and ε. distance method handles one point at a time. In PySpark flatMap method is used to map every point into an array of tuples (out).

Merging Partitions

All data points within a partition are merged before generating the visualization. They also need to be merged for further processing DBSCAN on PySpark.

reduceByKey method is used to merge partition data in one. The use of word partition may confuse with PySpark partitions but those are two separate things. Though partitionBy method could be used to reconcile that as well.

After reduceByKey, __ we will get each row of the rdd as a ring depicted in figure 4. As you can see there is an overlap so points would be in two rows of rdd and that is intentional.

Distance Calculations

The above code computes distance within each partition. The output of the method is a list of tuples. Each tuple has the id of point and set of its neighbors within ε distance. As we know that point would occur in two partitions so we need to combine the sets for a given point so we get all of its neighbors within ε distance in whole data. reduceByKey is used to combine the sets by doing union operation on them.

reduceByKey(lambda x, y: x.union(y))

Combined code till now looks as follows:

Core and Border Point Labeling

Once we have neighbors within ε distance for a point we can identify if it is a core or border point.

Core Point: There are at least _minpts within ε distance

Border Point: There are less than _minpts within ε distance but one of them is the core point.

To identify core and border points, first, we assign them to a cluster. For each point which is a core point, we create a cluster label the same to its id (assuming the ids are unique). We create a tuple for each core point and its neighbors of the form (id, [(_clusterlabel, _is_corepoint)]). All the neighbors in this scenario would be labeled as a base point. Let’s take an example

min_pts = 3
Input: (3, set(4,6,2))
Output: [(3, [(3, True)]), (4, [(3, False)]), (6, [(3, False)]), (2, [(3, False)])]

Input is a tuple where 3 is the id of point and (4, 6, 2) are its neighbors within ε distance.

As one can see all points are assigned cluster label 3. While 3 is assigned True for _is_corepoint as a core point and all other points are considered base points and assigned False for _is_corepoint.

We may have similar input tuples for 4, 6, and 2 where they may or may not be assigned as core points. The idea is to eventually combine all cluster labels for a point and if at least one of the assignment for _is_corepoint is True then its a core point otherwise its a border point.

We combine all (_clusterlabel, _is_corepoint) tuples for a point using reduceByKey method and then investigate if its a core point or not while combining all clusters labels for that point. If its a border point then we would only leave one cluster label for it.

The above method is used to combine all cluster label for a point. Again if it is a border point then we return only 1st cluster label.

Code in PySpark till now looks like the following:

Connected Core and Border Points

Now for each point, we have cluster labels. If a point has more than one cluster label then it means those clusters are connected. Those connected clusters are the final clusters we need to solve DBSCAN. We solve this by creating a graph with vertices as cluster labels and edges between cluster labels if they are assigned to the same point.

In the above code, combine_cluster_rdd is a collection of rows where each row is a tuple (point, _clusterlabels). Each cluster label is vertices and combinations of cluster labels for a point are the edges. Connected components of this graph give a mapping between each cluster label and a connected cluster. which we can apply to points to get final clusters.

Above is how the final method looks like which returns a Spark Dataframe with point id, cluster component label, and a boolean indicator if its a core point.

Comparision

Now I compare the results in terms of accuracy with sklearn implementation of DBSCAN.

The _makeblobs method creates blobs around three input centers. Running DBSCAN using sklearn and my implementation with ε=0.3 and _minpts=10 gives the following results.

Left: sklearn vs Right: pyspark based implementation (ε=0.3 and _minpts=10)
Left: sklearn vs Right: pyspark based implementation (ε=0.3 and _minpts=10)

Core points are bigger circles while border points are smaller ones. Noise points are colored black which is the same in both implementations. One thing that jumps out is the border points are assigned different clusters that speak to the non-deterministic nature of DBSCAN. My other post also talks about it.

Some Notes on DBSCAN Algorithm

Left: sklearn vs Right: pyspark based implementation (ε=0.2 and _minpts=10)
Left: sklearn vs Right: pyspark based implementation (ε=0.2 and _minpts=10)

For ε=0.2 we get the border points assigned to the same clusters. Following is some code and results on data in rings.

Number of Operations

For n=750, the number of distance operations required by a simple implementation of DBSCAN would be n(n-1)/2 which is 280875. As we create the partition based on ε, the smaller the ε less the number of distance operations would be needed. In this case, there were a total of 149716(ε=.2) and 217624(ε=0.3) operations needed.

Comparison of Ring Data

Left: sklearn vs Right: pyspark based implementation (ε=0.3 and _minpts=5)
Left: sklearn vs Right: pyspark based implementation (ε=0.3 and _minpts=5)
Left: sklearn vs Right: pyspark based implementation (ε=1 and _minpts=5)
Left: sklearn vs Right: pyspark based implementation (ε=1 and _minpts=5)

Conclusion

A pyspark implementation which would be efficient based on the value of ε with the following steps:

  1. Partitioning Data: Partition with overlapping rings of 2ε width moved by ε.
  2. Merging Partition Data: So we get all partition data in a single record.
  3. Distance Calculations: Calculate distance within the same partition
  4. Point Labeling: Based on the number of neighbors, label core, and border points.
  5. Connected Clusters: Use Graphframe to connect clusters labels to evaluate final DBSCAN labels.

A comparison with the existing implementation shows the accuracy of the algorithm and the implementation of this post.

Is it efficient?

On the local machine with both driver and worker nodes, implementation is slower than sklearn. There could be a couple of reasons which need to be investigated:

  1. For a small amount of data, sklearn may be much faster but is it the case for big-data?
  2. Graphframe takes quite some time to execute and wondering if connected components analysis can be performed on the driver with some other graph library?

Implementation

Complete PySpark Implementation can be fount at:

SalilJain/pyspark_dbscan


Related Articles