Bioinformatics generates a lot of data. "Omics" in particular (genomics, transcriptomics, proteomics…) and its associated sequence data (such as NGS, next generation sequencing) can be computationally challenging. Processing it can consume a lot of CPU, memory and disk space. Many workloads that people may want to run can often not even be considered.
One reason for this is that many important omics tools have been designed for operation on a single machine, and not for distributed (cluster) processing. Understandably so, since distributed processing adds new levels of difficulty to the already tricky craft of software development. However, this leads to a situation where sometimes we are expected to have access to machines with 100s of GB of RAM in order to perform, for example, taxonomic classification for metagenomics (where we identify all the different species present in a DNA sample). And even people who have such a beefy machine can’t escape the fact that its memory puts a hard limit on the size of the analyses that can be run. In an age of affordable cloud computing services and commodity hardware, we should be able to do better. We would like to have the choice to run our computations on a cluster or in the cloud in an efficient way.
Spark
Apache Spark is a powerful framework for Big Data analysis that has seen wide acceptance in recent years. It combines the strengths of functional programming, the MapReduce programming model, and, for those who want it, the modern Scala programming language (Python, Java and R may also be used). It manipulates large distributed collections of data and contains a query optimization engine that helps get the most out of the hardware. It is also supported by major cloud providers such as AWS and Google Cloud out of the box, but of course you could also run it on your own machines (or machine). Given all these benefits, it seems we should try to run some of our heavy omics analysis on Spark.
However, we can’t simply wrap an existing tool such as BLAST in Spark and expect it to work. In distributed computing we have to decompose the problem into relatively independent parts in order to get benefits. When we can express an algorithm in such a way, then Spark can do the heavy lifting for us and run the workload on clusters smoothly. Of course, whether this is even possible depends in each case on the algorithm or problem being considered.
k-mer counting
To understand the fundamentals of this space, we implemented a tool for k-mer counting. K-mers are short fragments of DNA sequences of length k. Thus, we may speak of 3-mers, 4-mers, 31-mers, and so on. k-mer analysis forms the backbone of many omics methods, including genome assembly, quality control of short reads, genome size estimation, and taxonomic classification. _k-_mer counting seemed to us to be the simplest problem in this space that is meaningful to solve, but still computationally challenging, and a good Spark-based solution should be a good foundation for solving other omics related problems. Essentially, in a given dataset of DNA sequences, we have to identify each distinct _k-_mer and count how many times each one occurs. On a single machine, traditionally you might use a tool such as Jellyfish to solve this problem. Below is an example of what the output may look like for k=28 (the full table would have billions of rows).
ATAATAATAGGAAGGCATTTTCGGACGA 1
ATAATAGGAAGGCATTTTCGGACGATTA 8
ATAATAGGCAGCCATGCCCGGACGGTGC 1
ATAATGTAGGTTCCATATTCAGGACATC 1
ATAGGAAACGCTTCTCGGACGGAAAGGT 2
How would you count k-mers in a distributed way? We can’t simply use a big distributed array, since the number of potential 31-mers (for example) is huge: 4³¹. We can’t maintain that many counters in memory or on disk, and furthermore k might be much larger than 31. Only a tiny subset of these counters would be used in practice. With such sparse keys, binning will be essential. If you know your algorithms, you know that hash functions are generally used to distribute data into bins. This is how we create data structures such as maps and dictionaries. On Spark, this is the basis for partitioning. Good partitioning is one of the main keys to Spark performance, as it controls how the work is distributed to different worker machines. Bad partitioning leads to data skew, and under this situation you might have, for example, one worker node receiving a very large share of the data, overwhelming it. This would not only slow it down but also potentially force all the other nodes to wait for the slow node to finish. Furthermore, it might greatly increase the RAM requirement for that node (and thus all the nodes, since we can’t control where the big chunk ends up). And thus we’d be back where we started.
As an analogy, imagine that a distributed pipeline is literally a system of pipes, and your workload consists of balls that you want to send through the pipes. If there are many golf balls and just one bowling ball, all the pipes have to be very wide. But if all the balls are golf balls, narrow pipes would suffice. Thus, evenly partitioning the data will be key to efficient resource usage.
Splitting the workload
Without going into too much detail, the accepted way of hashing _k-mers into bins has been a method called minimizers_, and this is what tools like Jellyfish tend to use. (You might wonder why one couldn’t simply use the standard string hash that comes with the programming language. The main reason is that hashing each k-mer separately causes the size of the input data to explode by 50x or more in the intermediate stages before counting. In Spark this would cause lots of slow shuffling over the network. Minimizers avoid this by hashing many adjacent k-mers together, a property that we seek to keep.)
Minimizers work well for binning on a single machine, but can produce bins of wildly different sizes. For a distributed application, this is not ideal as we would have data skew. To counter this, we were able to develop a new hash function (a paper is in preparation), which replaces minimizers and produces a much more even partitioning. This can be measured by the size of the largest bin for a given total number of bins. Our test case is cow rumen metagenomics data, run SRR094926 from PRJNA60251. At around 1,000,000 bins, our "worst" bin was less than 1/100 of the minimizer case, as can be seen in the chart below (k = 28). This is very valuable, since the largest bin, by and large, dictates the memory requirement of this entire application.

This allows us to smoothly divide the workload into a large number of small partitions, and then simply traverse each partition and count the k-mers there. When we had applied this hash and also optimized other bottlenecks, compared with a leading tool for distributed k-mer counting (which uses minimizers), our runtime was somewhat faster and memory usage was around 10% of the competing tool when applied to a cow rumen metagenomic dataset. Such low memory usage allowed us to use, for example, "highcpu" nodes on Google Cloud, which have less RAM and which are cheaper than normal nodes per hour. The innovation resulted in cost savings. Alternatively, we could have used the normal nodes but analysed 10x more data without running out of resources. Thus we arrive at an efficient, economical, and scalable way of counting _k-_mers in a distributed fashion, and hopefully a basis for other useful tools in this space.
For example, for k=28 at this time we are able to count 2.94×10¹⁰ distinct k-mers (7.23×10¹⁰ total) in 1.5 h on 4 worker "highcpu" nodes, which between them had a total of 57.6 GB RAM and 64 CPUs. This works out to about 1.33 CPU-hours per billion k-mers.
Conclusion
Many lessons were learned during this journey, but our main insight at this point has been that finding a way to eliminate data skew is very helpful for Spark Performance. By doing this we were able to improve performance and reduce memory usage by 90%. Depending on the situation, going as far as to develop one’s own hash function or custom partitioning scheme may be well worth the investment. For _k-_mers in omics data, it can change the tradeoffs completely.
More generally, when we make the move from running an algorithm on a single machine to running on a cluster, completely new issues can arise. To get the best results from Spark, it is necessary to understand and address these issues as they come up. If these issues can be dealt with, Spark can be a very good choice for the distributed (cloud or cluster) processing of omics data.
Finally, I would like to acknowledge FastKmer by U.F. Petrillo et al (BMC Bioinformatics 2019) which provided some important inspiration and ideas for our k-mer counting implementation.
Update (November 2020): The code behind this work has, after further development, been released as part of the application Discount (available on GitHub). The hash function that I described above has now evolved into the "universal frequency ordering" for minimizers. A related paper, which describes our latest results and contains much more detail on this topic, is available on BioRXiv.