The world’s leading publication for data science, AI, and ML professionals.

BigQuery fetching + multiprocessing

Does multiprocessing improve the fetching speed of BigQuery API requests?

Image by author
Image by author

Bigquery Storage Read API can be used to fetch data from BigQuery tables. However, at the time I’m writing these lines, no benchmark has been done using this API combined with multiprocessing, in order to process fetching faster.

In this article, I will show some of my research and benchmark that I’ve done in order to find « the most performant way to fetch data from BigQuery ».

The most common and easiest way to fetch data from BigQuery is to process the fetching linearly using only one core on the machine/instance.

However, if you are using a computer or a GCP compute engine (GCE) that has multiple cores, you might wonder if it could be useful to use them in order to process more data in parallel, but there is a key concept to understand:

The fetching will not necessarily be faster with more cores. The time used to fetch some data from the internet depends massively on the internet bandwidth available on your router/network.

Thus, having 200 cores on your device does not mean that your fetching will be processed 200 times faster but 200 processes can be created and each will fetch a chunk of the table, (using less bandwidth) in order to finally merge them into one table/DataFrame.

Referring to the GCE documentation, each vCPU can process an ingress of 2GB, for a maximum of 6 vCPUs. So I have built a tool: bqfetch, which given the number of cores to use, can fetch the table with one or multiple cores. If nb_cores=-1, then the algorithm will use the number of virtual CPUs available on the machine and it will create a process for each core and process them in parallel . The tool works with multiprocessing, billiard and joblibparallel backends.

To demonstrate if BigQuery Storage is efficient with multiprocessing, I ran independently 6 fetches on a table of size 2GB using 1 to 6 cores and I measured the time it took. This is the actual code using the tool:

Link to bqfetch repo.

Medium article concerning the bqfetch fast fetching library.

Note that in order to fetch huge tables (multiple TBs), we have to chunk/divide the whole table in smaller chunks that can fit into memory. Thus the first step is to divide the table using the chunks() method. We have to specify the column we want to use as an index in order to divide all the distinct values in this column in multiple chunks. We also have to specify the size of each chunk we want to process, so this value has to be lesser than the available memory on the machine (a warning message will appear if it is the case). The verbose mode prints information on the memory and the number/size of the chunks.

In brief, the following example divides the whole table in small chunks of size 2GB and fetch them successively using 1 to 6 cores for demonstration purpose only. Note that the chunk size and the size of the table is 2GB in this case, so only one chunk will be returned by the chunks() method but in real use cases, you will specify a larger table and the method will return a list of chunks to process sequentially.

Results on a 12 virtual-cores MacBook Pro

Image by author: time to fetch a 2GB table as pandas df, depending on the number of cores/processes (PC)
Image by author: time to fetch a 2GB table as pandas df, depending on the number of cores/processes (PC)

As shown, using multiple cores on a personal computer using a standard Wi-Fi connection does not improve the performances, there are even worse. This can be expected because the Internet bandwidth available on the network stills the same, even if we used multiple cores to fetch. As each process has to create a new ReadSession in order to read data from BigQuery, it takes more time because we have to initialise multiple connections in parallel and we also have to merge the results when each process has finished.

Results on a 30 virtual-cores GCP compute engine

Image by author: time to fetch a 2GB table as pandas df, depending on the number of cores/processes (GCE)
Image by author: time to fetch a 2GB table as pandas df, depending on the number of cores/processes (GCE)

Using a GCE, the same issue occurs and the performance stills being worse when the number of cores increases. According to the GCE documentation, we should have an increase of the network bandwidth, so what’s the issue? The reason why this happen is quite obscure to me but I guess it is due to BigQuery limitations concerning the size of rows filter per request which is only of size 10MB.

Conclusion

  • At the time I’m writing these lines (Aug. 2021), no native implementation is proposed by Google in order to use BigQuery storage with multiprocessing efficiently. So the only way we have to achieve this is not as optimized as we would like because of the limitations due to the API. We have to wait further improvements from the API contributors, in order to maybe have the possibility to gain performance using this method.
  • You can do your own tests using multiprocessing with BigQuery API using the tool I have implemented. The code is open source so you are welcome to check it or contribute.

Related Articles