Batched K-Means with Python Numba and CUDA C

How to Speed Up Your Data Analysis 1600x vs Scikit-Learn (With Code!)

Alex Shao
Towards Data Science

--

Image generated using Midjourney based on author’s drawing

Parallelizing data analysis workloads can be a daunting task, especially when there is no efficient off-the-shelf implementation available for your specific use case. In this tutorial, I will walk through the principles of writing CUDA kernels in both C and Python Numba, and how those principles can be applied to the classic k-means clustering algorithm. By the end of this article, you will be able to write a custom parallelized implementation of batched k-means in both C and Python, achieving up to 1600x speedup compared to the standard scikit-learn implementation. If you want to jump straight to the code, it is available in Colab.

Introduction

We will study two frameworks for parallelizing on NVIDIA GPUs: Python Numba and CUDA C. We will then implement the same batched k-means algorithm in both and benchmark them against scikit-learn.

Numba is a gentler learning curve for those who prefer Python over C. Numba compiles portions of your code into specialized CUDA functions called kernels. CUDA C, is a layer lower on the abstraction tree, which offers more refined control. The Colab examples are designed to mirror the algorithm implementations tightly, so once you understand one of them, you can easily understand the other.

This tutorial grew out of a custom batched k-means implementation I had to write, which I will use as an illustrative example. Normally, k-means libraries are optimized to run a single instance of the algorithm on an extremely large dataset. However, in our project we needed to cluster millions of individual small datasets in parallel, which is not something we could find an off-the shelf library for.

This article is split into four main sections: CUDA Basics, Parallelized K-means Algorithm, Python Numba, and CUDA C. I will try to keep the material relatively self-contained and briefly review concepts as needed.

CUDA Basics

CPUs are designed for fast sequential processing and GPUs are designed for massive parallelism. For our algorithm, we needed to run k-means on millions of small independent datasets, which lends itself perfectly to the GPU implementation.

Our implementation will be using CUDA, short for Compute Unified Device Architecture, which is a C library and computing platform developed by NVIDIA to leverage GPUs for parallel computation.

To understand GPU kernels and device functions, here are some definitions:

A thread is a single sequence of instructions that can be executed independently.

A core is a processing unit that can execute a single thread.

A warp is the smallest thread scheduling unit. Each warp consists of 32 threads and schedules them onto cores.

A multiprocessor, also referred to as a Streaming Multiprocessor (SM) is made up of a fixed number of cores.

A GPU kernel is a function that is designed to be executed in parallel across many GPU threads. Kernel functions are be invoked by the CPU and executed on the GPU.

A device function is a function that can be called by a kernel function or another device function. This function is executed on the GPU.

These kernels are organized into a hierarchy of grid, blocks, and threads. In the context of a GPU, each thread is connected to a single core and executes a copy of the kernel.

GPUFunction[(x, y), z](Data Structure). Image by Author.

A block is a collection of threads that runs on one multiprocessor.

A grid is an abstract array to organize blocks of threads. Grids are used to map kernel instances to threads by index.

On the GPU, there are two types of memory: global memory and shared memory.

Global memory is stored on DRAM (Dynamic Random Access Memory). All threads can access its contents at the cost of high memory latency.

Shared memory is stored on cache and is private to threads within the same block. It can be accessed much faster than global memory.

Parallelized K-Means Algorithm

Now let’s introduce the k-means algorithm. K-means is an unsupervised clustering algorithm that partitions a dataset into k distinct, non-overlapping clusters. Given a set of data points, we first initialize k centroids, or the starting cluster centers:

Centroid Initialization (k=3). Image by Author.

Then, after choosing initial centroids, iteratively perform two steps:

  1. Assignment Step: Each data point is assigned to its closest centroid based on Euclidean distance.
  2. Update Step: The centroid locations are reassigned to the mean of all the points assigned to that centroid in the previous step.

These steps are repeated until convergence, or when centroid locations no longer change significantly. The output is the set of k cluster centroid coordinates together with an array marking cluster indices (called labels) for each of the original data points.

K-means Algorithm (k = 3). Image by Author.

For a large dataset, centroid initialization can significantly affect the algorithm output. Therefore, the program tries multiple initial centroids, called initial seeds, and returns the result from the best seed. Each seed is selected from the initial dataset without replacement–meaning no initial centroid will be repeated. The optimal number of seeds for our algorithm is one third of the number of datapoints. In our program, because we run k-means on individual rows of 100 data points, the optimal number of seeds would be 33.

In our k-means function, the one million rows are represented by blocks while the threads represent the seeds. Threads in a block are organized into warps, the smallest thread scheduling unit in the hardware architecture. Each warp consists of 32 threads, making it optimal to set block sizes as multiples of 32. Each block then outputs the data from the seed that has the smallest inertia, measured by the sum of Euclidean distance between cluster centers and their assigned points.

Parallelized K-means — GPU side. Image by Author.

Linked here is the Colab where you can follow along in either Python or C.

global variables: numRows, lineSize, numClusters
def hostKMeans:
inputData = initializeInputData(numRows, lineSize)
outputCentroids = createEmptyArray(numRows, numClusters)
outputLabels = createEmptyArray(numRows, lineSize)

sendToDevice(inputData, outputCentroids, outputLabels)
cuda_kmeans[blockDimensions, threadDimensions](inputData, outputCentroids, outputLabels)
waitForKernelCompletion()
copyFromDevice(outputCentroids, outputLabels)

In our k-means algorithm, we begin by setting global variables. We will need to reference them from both the CPU and GPU.

While global variables are accessible from the kernel, the kernel cannot directly return values to the host. To circumvent this limitation, the CPU portion of our code passes two empty arrays to the kernel alongside the input data. These arrays will be used to copy the final centroid and label arrays back to the CPU.

With our data structures instantiated, we invoke the kernel, defining the grid and block dimensions as a tuple. The call to the kernel includes passing in our data, memory allocation for centroids and labels, and a state for the initial random centroid initialization.

def KMeansKernel(data, outputCentroids, outputLabels)
row = currentBlock()
seed = currentThread()

sharedInputRow = sharedArray(shape=(lineSize))
sharedInertia = sharedArray(shape=(numSeeds))
sharedCentroids = sharedArray(shape=(numSeeds, numClusters))
sharedLabels = sharedArray(shape=(numSeeds, lineSize))

sharedInputRow = data[row]

synchronizeThreads()
if seed == 0
centroids = initializeCentroids(data)
synchronizeThreads()

KMeansAlgorithm(sharedInputRow, sharedCentroids, sharedLabels)

sharedInertia[Seed] = calculateInertia(sharedInputRow, sharedCentroids, sharedLabels)

synchronizeThreads()
if seed == 0
minInertiaIndex = findMin(Inertia)
sharedOutputCentroids = centroids[minInertiaIndex]
sharedOutputLabels = labels[minInertiaIndex]

Once on device (aka the GPU), our code now exists simultaneously on the entire grid. To figure out where we are on the grid, we access the block and thread indices.

On the GPU, memory defaults to global memory which is stored on DRAM. Shared memory is private to threads within the same block.

By transferring the single row of data (that all threads in a block must read from) to shared memory, we cut down on memory access time. In the cuda_kmeans function, we create shared memory to store the row of data, centroids, labels, and the accuracy measure of each seed, called inertia.

In our program, each thread corresponds to one seed, and all threads in a block work on the same row of data. For each block, one thread sequentially creates 32 seeds and aggregates their outcomes into a single data structure for the other threads in the block.

When a subsequent step in the algorithm relies on this aggregation to have been completed, the threads must be synchronized using the built-in CUDA syncthreads() function. NB: one must be very careful about the placement of syncthreads() calls, since attempting to synchronize threads when not all of them have had a chance to complete can lead to deadlocks and hanging of the entire program.

Our kernel function, which is described in pseudocode below, is called cuda_kmeans. This function takes care of arranging the process outlined above, making space for the results from all 32 seeds and selecting the best seed for the final output in centroids and labels.

def KMeansDevice(dataRow, centroids, labels)
seed = currentThread()
centroidsRow = centroids[seed]
labelsRow = labels[seed]

centroidsRow = sort(centroidsRow)
yardStick = computeYardstick(sortedCentroids)

oldCentroids = localArray(shape=(numSeeds, numClusters))

for iteration in range(100):
if converged(oldCentroids, centroidsRow)
break
oldCentroids = copy(centroidsRow)
assignLabels(dataRow, centroidsRow, labelsRow)
updateCentroids(dataRow, centroidsRow, labelsRow)

From cuda_kmeans we then call the actual k-means algorithm, passing the newly instantiated shared memory. In our k-means algorithm, we start by picking the initial centroids and then sorting them from smallest to largest. We iteratively assign the data points to the nearest centroid and update centroid locations until convergence.

To be able to determine whether convergence has been achieved, we use a helper function called find_yard_stick. This function calculates and returns the smallest distance between two initial centroids (yard_stick). Our convergence condition is met when none of the centroids have moved by more than yard_stick times epsilon during a single iteration.

After convergence, we return to cuda_kmeans. Here, we determine the best seed by calculating the squared Euclidean distance between each centroid and its data points. The seed with the most effective grouping–indicated by the smallest inertia–is considered the best. We then take the centroids and labels from this seed and copy them to a single row in our output arrays. Once all blocks are done, these outputs are then copied back to the host (CPU).

Data Transfers During K-means Algorithm. Image by Author.

An Intro to Numba

The easiest way to design custom kernels is with Numba. Numba is a Python library that can be used to compile Python code into CUDA kernels.

Levels of Abstraction. Image by Author.

Under the surface, Numba interfaces with CUDA. To parallelize your code, Numba compiles your designated GPU code into a kernel and passes it to the GPU, dividing the program logic into two main parts:

  1. CPU level code
  2. GPU level code

Using Numba, you separate and hand off the sequential and parallelizable parts of code to the CPU and GPU. To compile a function for the GPU, the programmer uses the @cuda.jit decorator above the function definition, thereby transforming this function into a kernel which is called from the CPU (host) but is executed in parallel on the GPU (device).

Python Numba

Link to the Colab.

Numba serves as a bridge between Python code and the CUDA platform. Because the Python code is nearly identical to the algorithm pseudocode above, I am only going to provide a couple of examples of key relevant syntax.

cuda_kmeans[(NUM_ROWS,), (NUM_SEEDS,)](input_rows, output_labels, output_centroids, random_states)

After instantiating the necessary global variables and data structures, we can call the kernel, cuda_kmeans, from the host. Numba requires two tuples for the dimensionality of the blocks and threads. Because we will be using single-dimensional blocks and threads, the second index in each tuple is empty. We also pass in our data structures and an array of random states for random seed instantiation.

@cuda.jit()
def cuda_kmeans(input, output_labels, output_centroids, random_states):
row = cuda.blockIdx.x
seed = cuda.threadIdx.x
shared_input_row = cuda.shared.array(shape=(LINE_SIZE), dtype=np.float32)
shared_inertia = cuda.shared.array(shape=(NUM_SEEDS), dtype=np.float32)
shared_centroids = cuda.shared.array(shape=(NUM_SEEDS, NUM_CLUSTERS), dtype=np.float32)
shared_labels = cuda.shared.array(shape=(NUM_SEEDS, LINE_SIZE), dtype=np.int32)
if seed == 0:
get_initial_centroids(shared_input_row, shared_centroids, random_states)
cuda.syncthreads()

...

kmeans(shared_input_row, shared_labels, shared_centroids)

We use the Numba @cuda.jit() decorator to mark the for GPU compilation. Using the notation cuda.blockIdx.x and cuda.threadIdx.x we obtain the current index of the kernel. The shared arrays are instantiated using cuda.shared.array with two arguments, shape and the type, both of which must be known at compile time. After getting centroids and filling the row with data, we call the kmeans function, populate shared arrays and make a call to cuda.syncthreads().

@cuda.jit(device=True)
def kmeans(data_row, output_labels, output_centroids):
seed = cuda.threadIdx.x
labels_row = output_labels[seed]
centroids_row = output_centroids[seed]

...

old_centroids = cuda.local.array(shape=(NUM_CLUSTERS), dtype=np.float32)

for iteration in range(NUM_ITERATIONS):
if iteration > 0:
if converged(centroids_row, old_centroids, yard_stick * EPSILON_PERCENT, iteration):
break
# Assign labels and update centroids

K-means is a device function because it is called from the kernel function. Therefore, we must specify device=True in the decorator: @cuda.jit(device=True). The k-means function then gets the current row for labels and centroids and runs until convergence.

With just an extra dozen lines of code and a little bit of effort, your Python code can become an optimized kernel ready for parallel use.

Our parallelized k-means does cut down our compute time drastically–however, wrapping and compiling a high level language like Python is not necessarily optimal. Curious to see if writing the code in C would speed up our project, I dove into the realm of CUDA C.

An Intro to C

In Python, memory and type allocation are automatic, whereas in C, memory can be allocated on the stack or the heap, both of which require explicit type declaration and are allocated a fixed amount of memory. Stack memory is automatically allocated and freed by the compiler, while heap memory is manually allocated at runtime using functions like malloc(), with the programmer responsible for its deallocation.

Pointers are tools that hold the memory addresses of variables. The type of data the pointer refers to is defined during declaration. Pointers are specified using an asterisk (*). To retrieve the address of a variable, called referencing, you use an ampersand (&). To access value from a pointer, you dereference the pointer again using an asterisk.

A double pointer, or a pointer to a pointer, stores the address of a pointer. This is useful for modifying addresses of other pointers when passing arrays to functions. When passing arrays to functions, they’re passed as pointers to their first element, without size information, relying on pointer arithmetic to index through the array. To return an array from a function, you would pass the address of a pointer using & and receive it with a double pointer **, allowing you to modify the address of the original pointer, thereby passing the array.

int var = 100; // declare type
int *ptr = &var; // use of a pointer and reference
int **double_ptr = &ptr; // example of double pointer
printf(“Dereference double_ptr and ptr: %d %d \n:”, **double_ptr, *ptr)
int *ptr = 100; // initialize pointer to int type

CUDA C

Link to the Colab.

CUDA is a computing platform that leverages NVIDIA GPUs to parallelize complex computational problems. Building on your newfound mastery of C (joking), the CUDA C code structure is literally identical to the pseudocode structure we stepped through.

On the CPU side, we set up some constants that tell the algorithm what to expect, import libraries, initialize our variables, and define some macros.

#define NUM_ROWS 00000        // y dimension of our data set, number of blocks
#define LINE_SIZE 100 // x dimension of our data set
#define NUM_ITERATIONS 100 // max number of iterations
#define NUM_CLUSTERS 3 // We are running k = 3
#define MAX_INPUT_VALUE 100 // Upper bound on data
#define NUM_SEEDS 32 // Number of seeds/threads is 1/3 of LINE_SIZE
#define EPSILON_PERCENT 0.02 // Condition for convergence

void initInputData(float** input) {
srand(1);
// allocate memory for the data

... // initialize data using malloc and rand
// allocate memory on GPU
cudaMalloc(input, NUM_ROWS * LINE_SIZE * sizeof(float));
// copy memory from CPU sample_data to GPU memory
cudaMemcpy(*input, sample_data, NUM_ROWS * LINE_SIZE * sizeof(float), cudaMemcpyHostToDevice);
free(sample_data);
}

int main() {
float* inputData; // initialize input data, dimensions will by NUM_ROWS x LINE SIZE
initInputData(&inputData); // dereference and pass to function
// initialize output labels and centroids
cudaExtent labelsExtent = make_cudaExtent(sizeof(int), LINE_SIZE, NUM_ROWS);
cudaPitchedPtr outputLabels; // create the pointer needed for the next call
cudaMalloc3D(&outputLabels, labelsExtent); // allocate memory on GPU

cudaExtent centroidsExtent = make_cudaExtent(sizeof(float), NUM_CLUSTERS, NUM_ROWS);
cudaPitchedPtr outputCentroids; // create the pointer needed for the next call
cudaMalloc3D(&outputCentroids, centroidsExtent); // allocate memory on GPU
cuda_kmeans <<<NUM_ROWS, NUM_SEEDS>>> (inputData, outputLabels, outputCentroids);
cudaDeviceSynchronize();

... // copy output from device to back to host
}

Let us break the differences down.

The main function starts by initializing the data by creating a pointer and passing it to initInputData as an address to a pointer. The function receives inputData as a pointer to a pointer (float** input), which allows the function to modify the address held by the original pointer. Input is then pointed to a GPU memory address that is initialized using cudaMalloc and filled using cudaMemcpy, copying data from the temporary host array sample_data, already populated with random numbers.

Then, the code allocates memory on the device to hold the results from our k-means function. The function uses make_cudaExtent to create a cudaExtent object whose purpose is to encapsulate the dimensions of the multidimensional array.

The type cudaPitchedPointer is used to define a pointer that can address this pitched memory space. This type of pointer is specifically designed to work with memory allocated by cudaMalloc3D, which takes both a cudaPitchedPtr and cudaExtent object to allocate the linear memory on the GPU.

cuda_kmeans <<<NUM_ROWS, NUM_SEEDS>>> (inputData, outputLabels, outputCentroids);

Going into the GPU code, we define the grid such that each block corresponds to a row of data and each thread corresponds to a seed.

The seeds are initialized by a single thread in each block, ensuring entirely different seeds.

__global__ void cuda_kmeans(float* input, cudaPitchedPtr outputLabels, cudaPitchedPtr outputCentroids) {
int row = blockIdx.x;
int seed = threadIdx.x;

// shared memory is shared between threads in blocks
__shared__ float input_shm[LINE_SIZE];
__shared__ float error_shm[NUM_SEEDS];
__shared__ float seed_centroids[NUM_SEEDS][NUM_CLUSTERS];
__shared__ int seed_labels[NUM_SEEDS][LINE_SIZE];

... // get a single row of data
... // populate input_shm
... // populating the struct core_params
// the actual k-means function
kmeans(core_params);

// find seed with smallest error
calcError(core_params);
__syncthreads();
if (seed == 0) {
int* labels_line = LABELS_LINE(outputLabels, row);
float* centroids_line = CENTROIDS_LINE(outputCentroids, row);
labels_line[threadIdx.x] = seed_labels[seed][threadIdx.x];
centroids_line[threadIdx.x] = seed_centroids[seed][threadIdx.x];
}
}

The CUDA C code uses shared memory for the data, centroids, labels, and errors. However, unlike Python, the code takes the pointers to shared memory and stores them in a struct, which is just a method to pass variables en masse. Finally, cuda_kmeans calls the actual k-means algorithm and passes core_params.

__device__ void kmeans(core_params_t& core_params) {
DECLARE_CORE_PARAMS(core_params);
getInitialCentroids(core_params);
sort_centroids(centroids, num_clusters);
float yard_stick = findYardStick(core_params);
float* oldCentroids = (float*)malloc(NUM_CLUSTERS * sizeof(float));
struct work_params_t work_params;
work_params.min = find_min(line, LINE_SIZE);
work_params.max = find_max(line, LINE_SIZE);
work_params.aux_buf1 = (int*)malloc(NUM_CLUSTERS * sizeof(int));
work_params.aux_buf2 = (int*)malloc(NUM_CLUSTERS * sizeof(int));
work_params.aux_buf3 = (float*)malloc(NUM_CLUSTERS * sizeof(float));
for (int iterations = 0; true; iterations++) {
bool stop = (iterations > 100) || (iterations > 0 && (converged(core_params, oldCentroids, yard_stick * EPSILON_PERCENT)));
if (stop)
break;
memcpy(oldCentroids, core_params.centroids, NUM_CLUSTERS * sizeof(float));
getLabels(core_params);
getCentroids(core_params, work_params);
}
free(work_params.aux_buf1);
free(work_params.aux_buf2);
free(work_params.aux_buf3);
free(oldCentroids);
}

On the device function, before anything, we extract the values from the core_params struct into variables using the DECLARE_CORE_PARAMS macro.

Then, we run the same k-means algorithm as in Python, with the only difference being we pass the struct instead of variables and have to manage memory, pointers, and types.

Benchmarks

To compare our algorithms against non-parallelized k-means, we import the scikit-learn k-means module.

For our benchmark, we run 100,000 rows by 100 columns with three clusters. Because scikit-learn doesn’t have a parallelized k-means for different rows, we run the rows sequentially in a for loop.

For our benchmarks in colab, we use the free T4 GPU Colab instance.

Image by Author.

The results are good — the Python Numba code is two orders of magnitude faster than non-parallelized CPU code and the CUDA C code is three orders of magnitude faster. The kernel functions are easily scalable and the algorithm can be altered to support clustering in higher dimensions.

Note that the randomized initial centroid generation in both the C and Python algorithms is not fully optimized to use all cores. When improved, the Python algorithm may very well approach the C code run time.

Runtimes based on free Colab T4 GPU on 11/23/2023. Image by Author.

After running the k-means function a hundred times on different datasets and recording the resulting times–we notice that the first iteration is significantly slower due to the time it takes to compile both C and Python in Colab.

Conclusion

Now you should be ready to write your own custom GPU kernels! One remaining question may be — should you use CUDA C or Numba for parallelized data processing workloads? It depends. Both are vastly faster than off-the-shelf scikit-learn. Even though in my case the CUDA C batched k-means implementation turned out to be about 3.5x faster than an equivalent written using Numba, Python offers some important advantages such as readability and less reliance on specialized C programming skills in teams that mostly work in Python. Additionally, the runtime of your specific implementation will depend on how well you have optimized your code to e.g. not trigger serialized operations on the GPU. In conclusion, if you are new to both C and parallelized programming, I would suggest starting with Numba to prototype your algorithm, and then translate it into CUDA C if you need additional speedup.

References

  1. Scikit-learn: Machine Learning in Python, Pedregosa et al., JMLR 12, pp. 2825–2830, 2011.
  2. NVIDIA, Vingelmann, P. & Fitzek, F.H.P., 2020. CUDA, release: 10.2. 89, Available at: https://developer.nvidia.com/cuda-toolkit.
  3. Lam, Siu Kwan, Antoine Pitrou, and Stanley Seibert. “Numba: A llvm-based python jit compiler.” Proceedings of the Second Workshop on the LLVM Compiler Infrastructure in HPC. 2015.
  4. Harris, C.R., Millman, K.J., van der Walt, S.J. et al. “Array programming with NumPy.” Nature 585, 357–362 (2020). DOI: 10.1038/s41586–020–2649–2. (Publisher link).

--

--