Google Cloud Platform

Build Robust Google BigQuery Pipelines with Python: Part I

Apache Beam vs. Google BigQuery API

Chubing Tripepi
Towards Data Science
8 min readMar 14, 2021

--

Photo by Franki Chamaki on Unsplash

This is Part I of a three article series on interacting with BigQuery in Python to build data pipelines.

  • Part I. Apache Beam vs. Google BigQuery API
  • Part II. BigQuery Struct in Python
  • Part III. BigQuery and Google Sheet: the Hidden Complexity

Summary

At Spotify, we have many teams of software engineers and data engineers that use Scala or Java to help to bring large volumes of data into our Google Cloud Platform (GCP) ecosystem for users and researchers to consume. However, most of the data are not ready to be used immediately. There are cleaning and reshaping steps needed in order to do our analysis properly. It is important for us to have a robust pipeline that can handle day-to-day data cleaning needs and apply business logic and machine learning methods at the same time. Python has a unique advantage of being both agile and powerful. And that’s why I have invested a lot of time and energy in the past year to come up with an optimized solution to build BigQuery data pipelines in Python. Thanks to the open source community, there are a lot of resources out there giving great tips and guidance. However, it still took me some time to find the right info after several rounds of trial and error. Hence I hope this article will make your life easier if you are also trying to use BigQuery with Python.

In this post, I will give a quick overview of BigQuery, and discuss two of the most commonly used Python APIs that can interact with BigQuery.

Table of Content

  1. BigQuery Overview
  2. Apache Beam BigQuery Python I/O: Implementations, Pros, Cons
  3. Google BigQuery API in Python: Implementations, Pros, Cons

BigQuery Overview

BigQuery is a fully-managed, serverless data warehouse that enables scalable analysis over petabytes of data. It is a Platform as a Service that supports querying using ANSI SQL. It also has built-in machine learning capabilities. BigQuery was announced in May 2010 and made generally available in November 2011. (Wikipedia)

In GCP, the two most common places to store data are BigQuery and GCP Bucket. GCP Bucket acts more like a storage place, while BigQuery allows users to do analysis right away on the UI. There are many ways to interact with BigQuery. The easiest and most obvious way is through BigQuery’s interactive UI Console. BigQuery Console allows users use Standard SQL or Legacy SQL to query tables. In the Cloud Console and the client libraries, standard SQL is the default. In the bq command-line tool and the REST API, legacy SQL is the default. You can easily switch between the two in the query settings or by passing the parameter in the API calls.

Google recently published a new view with added features, such as allowing multiple tabs for editors, which is a welcome change among users. However, there are several bugs that need to be fixed. First, for daily partitions, only the most recent schema can be viewed — earlier partitions’ schema and preview functions are not working. Another example is that the delete table function only allows the user to delete the most recent partition, and will look like the user deleted everything in the dataset! Fortunately, that’s actually not the case; a refresh will show that only the latest partition is deleted.

BigQuery Console Current View
BigQuery Console Legacy View

Apache Beam BigQuery Python I/O

I initially started off the journey with the Apache Beam solution for BigQuery via its Google BigQuery I/O connector. When I learned that Spotify data engineers use Apache Beam in Scala for most of their pipeline jobs, I thought it would work for my pipelines. It turned out to be not the case, but no regrets since I learned a lot about Apache Beam along the way.

The Beam SDKs include built-in functions that can read data from and write data to Google BigQuery tables. To install, run pip install apache-beam[gcp] in your Terminal.

Import Packages

import apache_beam as beam
import json
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

Read from BigQuery

To read data from BigQuery, you have options. Apache Beam is not my favorite method to read data from BigQuery. I much prefer to use the Google BigQuery API client because it can download data and convert it to a Pandas data frame. But for your reference, you can either read from a table directly:

# project-id:dataset_id.table_id
table_spec = 'YOUR_PROJECT:YOUR_DATASET.YOUR_TABLE_ID'
my_column = (
p
| ‘ReadTable’ >> beam.io.ReadFromBigQuery(table=table_spec)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem[‘my_column’]))

Or pass a SQL query:

my_column = (
p
| ‘QueryTableStdSQL’ >> beam.io.ReadFromBigQuery(
query=’SELECT my_column FROM ‘\
‘`YOUR_PROJECT.YOUR_DATASET.YOUR_TABLE_ID`’,
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem[‘my_column’]))

Data Formatting

Now that you have retrieved the data, you can do all kinds of fun stuff with them in Python. You can manipulate the data, apply ML models, etc. But here are some pointers for some final processing before uploading this table to BQ.

  • Timestamp
    Format Timestamp column as following:
my_table[‘my_timestamp’] = my_table[‘my_timestamp’].apply(lambda x: x.strftime(‘%Y-%m-%d %H:%M:%S’)))
  • Data Types
    Convert Float, Integer, Timestamp, or Date data types to String first to be passed in Beam and then specify data type in the schema file. Please note that Timestamp/Date data type do not accept empty strings.
my_table = my_table.applymap(str)
  • Data Structure
    Data frames need to be converted to a list of dictionaries in order for them to be passed back via Beam I/O. The reason for that is Beam is designed to process large volume of data, so it processes data at the row level. This allows beam to separate the list and pass data back if it’s a huge dataset.
my_table = my_table.to_dict("records")
  • NULL Value
    None (Null) type cannot be passed as anything other than String in ApacheBeam. When passed as a String type, instead of NULL, you will see an actual ‘None’ string in BigQuery, which is the NULL value in Python. As confusing as it is, converting it to empty string will resolve the issue and get actual NULLs in BigQuery.

Once you have your data frame prepped for data types and converted to a list of dictionaries as required, the object is now ready to be uploaded to BigQuery.

Upload to BigQuery

In order to upload the data to BigQuery, we need to first define the schema. Here is a template to generate the schema file in JSON.

def make_sink_schema():
mapping = {
"string": ["STRING", "This is my string."],
"timestamp": ["TIMESTAMP", "This is my timestamp."],
"numeric": ["FLOAT", "This is my float."],
}
mapping_list = [{"mode": "NULLABLE", "name": k, "type": mapping[k][0], "description": mapping[k][1]} for k in mapping.keys()]

return json.JSONEncoder(sort_keys=True).encode({"fields": mapping_list})

You can also generate schema files in Avro. But here we use JSON as an example. Once you have the schema ready, you can use the following template to upload the table to BigQuery.

def run_beam(my_table, table_name, dataset_name, project_name, my_schema):

argv = [
'--project={0}'.format('YOUR_PROJECT'),
'--job_name=YOUR_JOB_NAME',
'--staging_location=gs://{0}/staging/'.format('YOUR_BUCKET'),
'--temp_location=gs://{0}/temp/'.format('YOUR_BUCKET'),
'--region=YOUR_REGION',
'--service_account = YOUR_SERVICE_ACCOUNT'
'--runner=DataflowRunner'
]

p = beam.Pipeline(argv=argv)

# write table to BigQuery
(p
| 'Create Data' >> beam.Create(my_table)
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
table=table_name, dataset=dataset_name, project=project_name,
# JSON schema created from make_sink_schema function
schema=parse_table_schema_from_json(my_schema),

# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)

return p
p = run_beam(my_table, table_name, dataset_name, project_name, my_schema)p.run().wait_until_finish()

Pros

ApacheBeam’s advantage is obvious when dealing with large volume of data. Since it process data at the row level, a dataset can be broken into multiple parts so that different workers can process them in parallel.

Cons

To me, the biggest issue with Apache Beam is its inflexibility when it comes to data types and NULL values. There are too many nuances and there’s not a lot of workarounds.

Google BigQuery API in Python

As I was coping with the cons of Apache Beam, I decided to give Google BigQuery API a try, and I am so glad that I did! If you are not trying to run a big job with large volume of data, Google BigQuery API is a great candidate. To install, run pip install — upgrade google-cloud-bigquery in your Terminal.

Import Packages

from google.cloud import bigquery
import pandas as pd

Read from BigQuery

To read data from BigQuery, you can decide if you want to read with your own credential, or with a service account credential. If you want to use your own credential, the first step is to authorize your Google account by running gcloud auth login command in your Terminal. And then you can proceed to your favorite Python editor:

# Construct a BigQuery client object.
client = bigquery.Client(project=YOUR_PROJECT)
query = """
SELECT name, SUM(number) as total_people
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = 'TX'
GROUP BY name, state
ORDER BY total_people DESC
LIMIT 20
"""
query_job = client.query(query).result().to_dataframe() # Make an API request and convert the result to a Pandas dataframe.

If you choose to use a service account credential instead, first save it as a local JSON file, and then construct the client object as follows:

client = bigquery.Client.from_service_account_json(
json_credentials_path=PATH_TO_YOUR_JSON,
project=YOUR_PROJECT)

Data Formatting

Google BigQuery API also has some quirks with data types, but what I like is that it processes NULL values better, and allows NULL values in all data types. Here are some additional tips:

  • Timestamp
    Format Timestamp column as following:
my_table[‘my_timestamp’] = my_table[‘my_timestamp’].apply(lambda x: x.strftime(‘%Y-%m-%dT%H:%M:%S’)))
  • Date
    Format Date column as following:
my_table['my_date'] = pd.to_datetime(my_table['my_date'], format='%Y-%m-%d', errors='coerce').dt.date
  • String
    Empty strings will return as empty strings on BigQuery. So replace them with np.nan .
my_table = my_table.replace(r'^\s*$', np.nan, regex=True)

Upload to BigQuery

You can choose to auto-detect schema when uploading the data to BigQuery, or you can use the following function to define your own schema. Please note that the order of columns does NOT matter. BigQuery client will look up the columns by name.

def make_schema():
schema = [
bigquery.SchemaField(name="string", field_type="STRING", description="This is my string."),
bigquery.SchemaField(name="numeric", field_type="FLOAT", description="This is my float."),
]

return schema

Once you have the schema ready, you can use the following template to upload the table to BigQuery.

def run_bigquery(my_table, table_id, project_name, my_schema, table_description = None):
"""
Upload a dataframe to BigQuery
"""
client = bigquery.Client(project=project_name)

job_config = bigquery.LoadJobConfig(schema=my_schema, destination_table_description=table_description,
write_disposition='WRITE_TRUNCATE')

job = client.load_table_from_dataframe(
my_table, table_id, job_config=job_config)

return job
job = run_bigquery(YOUR_TABLE, table_id, project_name, make_schema(), table_description)

job.result()

Pros

Google BigQuery API is easier to implement than Apache Beam. It is also more flexible with data types and handles the NULL values in Python better.

Cons

If the data volume is large, this is not a suitable solution. Consider Apache Beam in this case.

So here you go! I hope this article will give you some ideas about using BigQuery in Python. My next article in this series will be about a specific structure in BigQuery — Struct, and the workarounds in the Python environment due to current limitations. See you soon! :)

--

--

Senior Analytics Engineer @Spotify | RYT® 200 Yoga Instructor | Puzzle Solver