K-Means++ Implementation in Python and Spark

Syed Sadat Nazrul
Towards Data Science
7 min readAug 7, 2018

--

For this tutorial, we will be using PySpark, the Python wrapper for Apache Spark. While PySpark has a nice K-Means++ implementation, we will write our own one from scratch.

Configure PySpark Notebook

If you do not have PySpark on Jupyter Notebook, I found this tutorial useful:

Load Dataset as RDD

Before starting, ensure you have access to the weather station dataset:
https://github.com/yoavfreund/UCSD_BigData_2016/tree/master/Data/Weather

def parse_data(row):
'''
Parse each pandas row into a tuple of
(station_name, feature_vec),`l

where feature_vec is the concatenation of the projection vectors
of TAVG, TRANGE, and SNWD.
'''
return (row[0],
np.concatenate([row[1], row[2], row[3]]))
## Read data
data = pickle.load(open("stations_projections.pickle", "rb"))
rdd = sc.parallelize([parse_data(row[1])
for row in data.iterrows()])

Let’s look at the first row:

rdd.take(1)

The name of the weather station is USC00044534 and the rest are the different weather information we will use for clustering.

Importing Libraries

import numpy as np 
import pickle
import sys
import time
from numpy.linalg import norm
from matplotlib import pyplot as plt

Defining Global Parameters

# Number of centroids
K = 5
# Number of K-means runs that are executed in parallel. Equivalently, number of sets of initial points
RUNS = 25
# For reproducability of results
RANDOM_SEED = 60295531
# The K-means algorithm is terminated when the change in the
# location of the centroids is smaller than 0.1
converge_dist = 0.1

Utility Functions

The following functions will come in handy as we move forwards:

def print_log(s):
'''
Print progress logs
'''
sys.stdout.write(s + "\n")
sys.stdout.flush()

def compute_entropy(d):
'''
Compute the entropy given the frequency vector `d`
'''
d = np.array(d)
d = 1.0 * d / d.sum()
return -np.sum(d * np.log2(d))


def choice(p):
'''
Generates a random sample from [0, len(p)),
where p[i] is the probability associated with i.
'''
random = np.random.random()
r = 0.0
for idx in range(len(p)):
r = r + p[idx]
if r > random:
return idx
assert(False)

Initialization of Centroids

For K-Means++, we wish to have the centroids as far apart as possible upon initialization. The idea is to have the centroids to be closer to the distinct cluster centers upon initialization and hence reach convergence faster.

def kmeans_init(rdd, K, RUNS, seed):
'''
Select `RUNS` sets of initial points for `K`-means++
'''
# the `centers` variable is what we want to return
n_data = rdd.count()
shape = rdd.take(1)[0][1].shape[0]
centers = np.zeros((RUNS, K, shape))
def update_dist(vec, dist, k):
new_dist = norm(vec - centers[:, k], axis=1)**2
return np.min([dist, new_dist], axis=0)
# The second element `dist` in the tuple below is the
# closest distance from each data point to the selected
# points in the initial set, where `dist[i]` is the
# closest distance to the points in the i-th initial set
data = (rdd
.map(lambda p: (p, [np.inf] * RUNS)) \
.cache())
# Collect the feature vectors of all data points
# beforehand, might be useful in the following
# for-loop
local_data = (rdd
.map(lambda (name, vec): vec)
.collect())
# Randomly select the first point for every run of
# k-means++, i.e. randomly select `RUNS` points
# and add it to the `centers` variable
sample = [local_data[k] for k in
np.random.randint(0, len(local_data), RUNS)]
centers[:, 0] = sample
for idx in range(K - 1):
########################################################
# In each iteration, you need to select one point for
# each set of initial points (so select `RUNS` points
# in total). For each data point x, let D_i(x) be the
# distance between x and the nearest center that has
# already been added to the i-th set. Choose a new
# data point for i-th set using a weighted probability
# where point x is chosen with probability proportional
# to D_i(x)^2 . Repeat each data point by 25 times
# (for each RUN) to get 12140x25
########################################################
#Update distance
data = (data
.map(lambda ((name,vec),dist):
((name,vec),update_dist(vec,dist,idx)))
.cache())
#Calculate sum of D_i(x)^2
d1 = data.map(lambda ((name,vec),dist): (1,dist))
d2 = d1.reduceByKey(lambda x,y: np.sum([x,y], axis=0))
total = d2.collect()[0][1]
#Normalize each distance to get the probabilities and
#reshapte to 12140x25
prob = (data
.map(lambda ((name,vec),dist):
np.divide(dist,total))
.collect())
prob = np.reshape(prob,(len(local_data), RUNS))
#K'th centroid for each run
data_id = [choice(prob[:,i]) for i in xrange(RUNS)]
sample = [local_data[i] for i in data_id]
centers[:, idx+1] = sample
return centers # The second element `dist` in the tuple below is the
# closest distance from each data point to the selected
# points in the initial set, where `dist[i]` is the
# closest distance to the points in the i-th initial set
data = (rdd
.map(lambda p: (p, [np.inf] * RUNS)) \
.cache())
# Collect the feature vectors of all data points
# beforehand, might be useful in the following
# for-loop
local_data = (rdd
.map(lambda (name, vec): vec)
.collect())
# Randomly select the first point for every run of
# k-means++, i.e. randomly select `RUNS` points
# and add it to the `centers` variable
sample = [local_data[k] for k in
np.random.randint(0, len(local_data), RUNS)]
centers[:, 0] = sample
for idx in range(K - 1):
########################################################
# In each iteration, you need to select one point for
# each set of initial points (so select `RUNS` points
# in total). For each data point x, let D_i(x) be the
# distance between x and the nearest center that has
# already been added to the i-th set. Choose a new
# data point for i-th set using a weighted probability
# where point x is chosen with probability proportional
# to D_i(x)^2 . Repeat each data point by 25 times
# (for each RUN) to get 12140x25
########################################################
#Update distance
data = (data
.map(lambda ((name,vec),dist):
((name,vec),update_dist(vec,dist,idx)))
.cache())
#Calculate sum of D_i(x)^2
d1 = data.map(lambda ((name,vec),dist): (1,dist))
d2 = d1.reduceByKey(lambda x,y: np.sum([x,y], axis=0))
total = d2.collect()[0][1]
#Normalize each distance to get the probabilities and
# reshape to 12140x25
prob = (data
.map(lambda ((name,vec),dist):
np.divide(dist,total))
.collect())
prob = np.reshape(prob,(len(local_data), RUNS))
#K'th centroid for each run
data_id = [choice(prob[:,i]) for i in xrange(RUNS)]
sample = [local_data[i] for i in data_id]
centers[:, idx+1] = sample
return centers

K-Means++ Implementation

Now that we have the initialization function, we can now use this to implement the K-Means++ algorithm.

def get_closest(p, centers):
'''
Return the indices the nearest centroids of `p`.
`centers` contains sets of centroids, where `centers[i]` is
the i-th set of centroids.
'''
best = [0] * len(centers)
closest = [np.inf] * len(centers)
for idx in range(len(centers)):
for j in range(len(centers[0])):
temp_dist = norm(p - centers[idx][j])
if temp_dist < closest[idx]:
closest[idx] = temp_dist
best[idx] = j
return best


def kmeans(rdd, K, RUNS, converge_dist, seed):
'''
Run K-means++ algorithm on `rdd`, where `RUNS` is the number of
initial sets to use.
'''
k_points = kmeans_init(rdd, K, RUNS, seed)
print_log("Initialized.")
temp_dist = 1.0

iters = 0
st = time.time()
while temp_dist > converge_dist:
# Update all `RUNS` sets of centroids using standard k-means
# algorithm

# Outline:
# - For each point x, select its nearest centroid in i-th
# centroids set

# - Average all points that are assigned to the same
# centroid

# - Update the centroid with the average of all points
# that are assigned to it

temp_dist = np.max([
np.sum([norm(k_points[idx][j] - new_points[(idx,
j)]) for idx,j in new_points.keys()])
])

iters = iters + 1
if iters % 5 == 0:
print_log("Iteration %d max shift: %.2f (time: %.2f)" %
(iters, temp_dist, time.time() - st))
st = time.time()

# update old centroids
# You modify this for-loop to meet your need
for ((idx, j), p) in new_points.items():
k_points[idx][j] = p

return k_points

Benchmark Test

The beauty of K-Means++ over K-Means is its speed of convergence due to its initialization algorithm. Also, Spark is being used to parallelize this algorithm as much as possible. Hence, let use Benchmark this implementation.

st = time.time()

np.random.seed(RANDOM_SEED)
centroids = kmeans(rdd, K, RUNS, converge_dist,
np.random.randint(1000))
group = rdd.mapValues(lambda p: get_closest(p, centroids)) \
.collect()

print "Time takes to converge:", time.time() - st

Depending on the number of processor cores, core memory set for each executor, and the number of executors used, this result will differ.

Cost Function

In order to verify the accuracy of the model, we need to pick a cost function and attempt to minimize it using the model. The final cost function will give us an idea for accuracy. For K-Means, we look at the distance between the datapoints and the nearest centroids.

def get_cost(rdd, centers):
'''
Compute the square of l2 norm from each data point in `rdd`
to the centroids in `centers`
'''
def _get_cost(p, centers):
best = [0] * len(centers)
closest = [np.inf] * len(centers)
for idx in range(len(centers)):
for j in range(len(centers[0])):
temp_dist = norm(p - centers[idx][j])
if temp_dist < closest[idx]:
closest[idx] = temp_dist
best[idx] = j
return np.array(closest)**2

cost = rdd.map(lambda (name, v): _get_cost(v,
centroids)).collect()
return np.array(cost).sum(axis=0)

cost = get_cost(rdd, centroids)
log2 = np.log2
print "Min Cost:\t"+str(log2(np.max(cost)))
print "Max Cost:\t"+str(log2(np.min(cost)))
print "Mean Cost:\t"+str(log2(np.mean(cost)))

Min Cost: 33.7575332525
Max Cost: 33.8254902123
Mean Cost: 33.7790236109

Final Result

Here is the final result:

print 'entropy=',entropybest = np.argmin(cost)
print 'best_centers=',list(centroids[best])

--

--

Using Machine Learning to catch cyber and financial criminals by day … and writing cool blogs by night. Views expressed are of my own.