Want to use BigQuery? Read this

Lorenzo Peppoloni
Towards Data Science
10 min readNov 10, 2018

--

I recently started to use BigQuery and I must admit I fell in love with the DB…
This article is my attempt to explain the technology behind it, which is a requirement to efficiently utilise the DB in terms of cost and performance.

BigQuery is the public implementation of Dremel that was launched by Google to general availability.

Dremel is Google’s query engine and it is able to turn SQL queries into an execution tree which reads data from Google’s distributed filesystem. Dremel has high scalability and is able to return queries results within seconds (or tens of seconds) despite the size of the dataset.

BigQuery provides the core features of Dremel to third parties, via a REST API, a command line interface and a Web UI.

But BigQuery is a bit more than Dremel…

In fact, BigQuery leverages multiple technologies developed at Google.

First, it uses Borg (Google’s cluster management system) to allocate the compute capacity for the Dremel jobs. Dremel jobs read data from Google’s file systems using Jupiter, Google’s high-speed network, which exchange data at 10 Gbps.

Thanks to its architecture, BigQuery does not require indexes, the data is stored in a proprietary columnar format called (more on this later) on Colossus (Google’s file system) and each query performs a full scan of the targeted table.

The increase in load is managed mainly by adding servers. This is handled transparently for the user, who does not “add servers” or “use bigger machines” in the way they would using e.g. Redshift or Postgres.

The terms “no indexes” and “full scans” are not usually synonyms of “being fast”, so why is BigQuery so fast?

Mainly thanks to two factors:

  • Columnar storage: the Data is stored by columns and this makes it possible to achieve very high compression ratio and scan throughput.
  • Tree architecture: a tree execution architecture is used to dispatch queries and aggregate results across thousands of machines.

Columnar storage

Being an analytics DB, BigQuery storage format is optimised for accessing few columns for a very big amount of rows. For this reason, the storage is performed by columns, thus you only access fewer and different storage volumes (which is even faster since you can access them in parallel).

The data model is tightly related to protocol buffers, with required, repeated and optional fields.
To store information in a contiguous way, let’s introduce the concept of repetition level and definition level.

  • Repetition level: the level of the nesting in the field path at which the repetition is happening.
  • Definition level: how many optional/repeated fields in the field path have been defined.

Let’s make an example, let’s imagine we have the following table definition:

message Book {
required string title,
repeated string author,
repeated group price {
optional int64 discount,
optional int64 usd,
optional int64 eur,
}
}

and we have three records:

Book1:
author: "AAA"
title: "firstTitle"
price:
discount: 0
eur: 11
usd: 12
Book2:
author: "BBB"
author: "CCC"
author: "DDD"
title: "secondTitle"
Book3:
title: "thirdTitle"
price:
discount: 0
eur: 11
price:
discount: 1
eur: 11

Let’s compute the repetition and definition levels for each value. I will also add explicit null values for the missing optional fields.

Book1:
author: "AAA" R: 0, D: 1
title: "firstTitle" R: 0, D: 1
price:
discount: 0 R: 0, D: 2
eur: 11 R: 0, D: 2
usd: 12 R: 0, D: 2
Book2:
author: "BBB" R: 0, D: 1
author: "CCC" R: 1, D: 1
author: "DDD" R: 1, D: 1
title: "secondTitle" R: 0, D: 1
(price):
(discount: null) R: 0, D: 0
(eur: null) R: 0, D: 0
(usd: null) R: 0, D: 0
Book3:
title: "thirdTitle" R: 0, D: 1
(author: null) R: 0, D: 0
price:
discount: 0 R: 0, D: 2
eur: 11 R: 0, D: 2
(usd: null) R: 0, D: 1
price:
discount: 1 R: 1, D: 2
eur: 11 R: 1, D: 2
(usd: null) R: 1, D: 1

Repetition levels are always zero when there is no repetition, when a field is repeated, such as author in the second record, R is 1 because the repetition happens at the first repeated level, same for price in the third record.

Definition levels are quite straightforward, for example in the first record price.discount has 2 because both price and discount are defined. On the other hand, in record 3, the last null price.usd has D equal to 1, because price is defined, but price.usd isn’t.

Each column is stored as a set of blocks like:

compressed value, R, D

R and D are stored only when necessary and cannot be inferred. Null values can be inferred as, for them, D is always a number lower than the sum of repeated and optional fields in the field path (as it can be seen from the example).
From the stored information, each record can be easily reconstructed in parallel for each queried column.

For example, let’s consider the price.eur column. On disk we will have:

11 R: 0, D: 2
NULL R: 0, D: 0
11 R: 0, D: 2
11 R: 1, D: 2

Scanning the column, every time R is zero we are encountering a new record, while when R is greater than 0 we have repetitions in the same record. As explained before, D can be used for null values. Thus, traversing the column we obtain:

Book1:
price:
eur: 11
Book2:
price:
eur: null
Book2:
price:
eur: 11
price:
eur: 11

Column storage brings also storing advantages since it allows you to compress each column. Two classics solutions are bitmaps and run-length encoding (RLE).

Let’s imagine you have a column composed of n rows, with k distinct values. Using the previous example, you have the price.eur column with the following values (n = 10, k = 5)

[10.0, 10.0, 8.99, 8.99, 7.0, 6.0, 7.0, 6.0, 2.0, 2.0]

This column can be easily compressed with k bitmaps (one for each distinct value) of length n (row length), where you have a set bit in the position of a particular value if that value is in the row.

price.eur: 10.0 [1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
price.eur: 8.99 [0, 0, 1, 1, 0, 0, 0, 0, 0, 0]
price.eur: 7.0 [0, 0, 0, 0, 1, 0, 1, 0, 0, 0]
price.eur: 6.0 [0, 0, 0, 0, 0, 1, 0, 1, 0, 0]
price.eur: 2.0 [0, 0, 1, 1, 0, 0, 0, 0, 1, 1]

The bitmaps can be stored instead of the actual column. The advantage is even bigger, if you think about selection, aggregation and projection patterns. In analytics DBs, queries (like the one below)

can be directly performed loading the bitmaps for the values = 4.0 and < 3.0 and performing a bit-wise AND.

The compression can be improved even more, using RLE. What you do in that case is representing the sequences of 0s and 1s. As an example, the first three bitmaps would turn into:

price.eur: 10.0 – 0,2 (0 0s, 2 1s, rest 0s)
price.eur: 8.99 – 2,2 (2 0s, 2 1s, rest 0s)
price.eur: 7.0 – 4,1,1,1 (4 0s, 1 one, 1 zero, 1 one, rest 0s)

As you can imagine, the efficiency of this kind of techniques is strongly dependant on different factors, such as rows order, column types and usage.
What Capacitor does, is trying to find a smart rows reordering with the aim of optimising (or at least sub-optimise) data storage and retrieval.

This choice has some downsides too. For example, it works badly for updates. Dremel deals with that by not supporting any update operation.

Coffee break (Photo by rawpixel on Unsplash)

Tree architecture for query execution

Each SQL statement takes as input one or multiple nested tables and their schemas and produces a nested table and its output schema.
The idea of the nested data structure is used to reduce the impedance mismatch (aka the mismatch between objects in the application code and DBs data representation).

A nested result is always produced even though no record constructor is specified in the query.

Let’s make an example with an SQL query that performs: projection (AS), selection (SELECT/FROM/WHERE) and within-record aggregation (COUNT — WITHIN).

The query will return something like:

Book: Book1
price:
discountCnt: 1
str: "AAA,firstTitle"

If you think about the nested records as a labelled tree, then:

  • Selection: WHERE prunes out branches that do not meet the conditions, while SELECT produces values at the same level as the most repeated field used in the expression.
  • Aggregation: COUNT/WITHIN performs within-record aggregation, counting the occurrences of discounts in the price field.

But how is the query performed?
The answer is: using a tree execution approach.

The root node receives the query, reads the table metadata and reroutes the query to the next level. At the bottom, the leaf nodes are the ones interacting with the distributed filesystem, retrieving the actual data, and propagating it back up in the tree.

As an example, let’s assume you execute:

The root node gets all the partitions (maybe some of you may be more familiar with the term shards) of which T is composed, than rewrites the query as

where R1iR1n are the results of the of the queries sent to the nodes at level one. In particular:

Where T1i is the tablet (shard of T) processed by server i in the first level of the execution tree.

The query is modified and pushed at the following level, until it reaches the leaves. The leaves read the actual column data from the partitions, then everything is propagated back to the root.

It is to be noted that the amount of data scanned by the leaves is the amount of data which affect the cost you pay for the query.

Regarding to partitioning, BigQuery determines the optimal number of partitions for a table upon loading the data, the number is then tuned based on data access and query pattern.

Query dispatcher

Usually, several queries are executed at the same time, a query dispatcher schedules queries and balances the load.

The amount of data processed in each query is usually larger than
the number of processing units available for execution (slots). A slot corresponds to an execution thread on a leaf server. Usually, what happens is that each slot is assigned to multiple tablets.
When executing queries, the dispatcher computes histograms of the tablets processing times and it can decide to reschedule tablets which take too long to be processed.

As far as replication is concerned, each tablet is usually three-way replicated, so if one of them is not accessible, the leaf server accesses another replica.

Let’s use some math to understand how this mechanism can be so fast in performing queries.

Let’s say you have a table with 50TB of data, 10 columns and 1000 partitions, taking into account the column storage previously discussed, you have 10 files times 1000 tablets.

Let’s assume you perform a SELECT *on the table.
If you have 2000 slots, each slot will have to read 5 files (10x1000/2000) of size 5GB each, for a total of 25GB which can be read in parallel.
Considering the speed of the Jupiter network, this data can be served in around 20 seconds.

Practical considerations

Let’s go through some practical considerations, that can be drawn from understanding the technology behind BigQuery.

  1. Minimise the amount of scanned data: never run a SELECT *, if this is true for analytics DBs in general, this is even truer for BigQuery and its pay-per-query policy. Carefully select the column you need. On top of that, you can apply several strategies to reduce the amount of data, such as using partitioned tables.
  2. No deletions: if you made a mistake, you cannot delete or rename a column… You can only add or relax a column’ s mode from REQUIRED to NULLABLE. However, you still CAN remove or rename a column. You can, for example, execute a query which selects all the columns except for the one you want to remove and target a new table with the query results. Or you can dump the existing table to Google Cloud Storage (GCS) and reimport it without the column you want to remove or rename. Each of these will come with the downside of cost (you are scanning the full table, you are creating new tables so you need additional storage).
  3. External sources: BigQuery can also target external data sources with its queries. The supported sources are Bigtable, Google Cloud Storage, and Google Drive. The data is loaded on-the-fly into the Dremel engine. This can be great, for example, if you want to directly query logs you store on GCS. On the other hand, targeting external data will result in lower performance compared to targeting native tables.

Conclusions: We had an in-depth look at how BigQuery works under the hood. In particular, how its storing format and query execution mechanism perform fast analytics queries on a huge amount of data. Then, we drew some practical considerations about how to optimise cost and queries performance in the light of what we learnt about BigQuery architecture.

If you enjoy the article and you found it useful, feel free to 👏 or share.

Cheers!

--

--

Tech enthusiast, life-long learner, with a PhD in Robotics. I write about my day to day experience in Software and Data Engineering.