How to create DataFlow IoT Pipeline -Google Cloud Platform

Huzaifa Kapasi
Towards Data Science
7 min readFeb 12, 2020

--

Data Pipeline Architecture from Google Cloud Platform Reference Architecture

Introduction

In this article we will see how to configure complete end-to-end IoT pipeline on Google Cloud Platform. You will know -

How to create Device Registries in Cloud Iot Core

How to create Topics and Subscriptions

How to send messages to the device using GCP Python IoT Client

How to setup PubSub

How to setup Cloud Dataflow pipeline from PubSub to BigQuery

How to Setup Cloud IoT Core with Device Registeries and PubSub

Go to https://console.cloud.google.com/, login with your credentials and search for IoT Core.

After selecting IoT Core, its main interface page will open as below.

We first need to create Registry to register the devices.

Click on create Registry. New page of create registry opens as below

· Give name to your registry as desired. You can see the naming rules shown.

· Select the region that you want the data to be stored. Best keep the registry region close to device source

· You will then notice, it asks to name the topic name of pub-sub to route the messages from devices registered in the registry.

· So we will take a small divergence; go to pub-sub and create topics and subscriptions.

Go to https://console.cloud.google.com/ in the new tab and search for Pub-Sub

It will open Pub-Sub landing page as shown below

Since I already have a created topics, it displays in the list.

Click on Create Topic. A new create topic configuration page opens as below

Give a topic ID you want. It will automatically create a topic name adding path to your projects.

Click on Create Topic. Once you do, you will see the topic created in the Topics landing page.

Topic will forward messages from Publisher Device to the Subscriptions. Any consumer having subscription can consume the messages.

So, lets create a subscription and associate it with the topic we created.

For that, click on subscription from landing page of Pub-Sub. It will open subscription pane.

Click on Create Subscriptions. This will open Subscription Configuration Pane.

Give ID of your choice. It will create a subscription name with the project name automatically.

Link your subscription with the Topic name that was just created.

Leave the rest of default settings and click on Create.

This will create Subscription as shown in Figure

Now lets go back to IoT core tab, and associate the registry with the topic we created in the Create a Registry Config pane.

Click on Create to create the Registry.

Once the registry is created, the IoT core landing page will look like below

Click on Registry created. It will open Registry page as below.

We will now need to create a Device instance and associate it with the Registry we created. This will complete the path of Device Creation, Registry Creation, Topic- Subscription Creation. We can then send message from GCP Client to these devices. The messages will be routed to the topic and via subscription. One can then pull the messages with APIs.

Click on Devices, then Create Device tab

This will open device configuration page. Give a device ID, leave the rest of the setting as it is, and click on create. This will create a device instance associated with the Registry.

Create GCP Client to Send Messages via MQTT Protocol

Download Python Scripts for Google Cloud Platform implementation @

https://github.com/GoogleCloudPlatform/python-docs-samples

Go to tree/master/iot/api-client/end_to_end_example/ cloudiot_pubsub_example_mqtt_device.py

This client has dependencies on following python libraries.

argparse, datetime, json, os, ssl, time, jwt, paho MQTT Client. You can use pip install to install the relevant libraries, if needed, into your python packages.

The client file generates dummy temperature data message and sends telemetry data to the device we created on IoT Core. The message is further routed to Pub-Sub topic. We can see the messages in Pub-Sub or can subscribe and extract messages.

Please note this is a baseline script. You can make changes as per your message requirements.

Execute the code

python cloudiot_pubsub_example_mqtt_device_liftpdm.pyproject_id=yourprojectname — registry_id=yourregistryid — device_id=yourdeviceid — private_key_file=RSApemfile — algorithm=RS256

You can generate the RSA pem file with following command using openSSL as below-

openssl genpkey -algorithm RSA -out rsa_private.pem -pkeyopt rsa_keygen_bits:2048

openssl rsa -in rsa_private.pem -pubout -out rsa_public.pem

You will see different message as I am publishing different set of data. The message will be Ackd though.

Now lets go to PubSub and see the message.

click on Topic ID you created

Click on View messages. You will be select the subscription. Click on the subscription from the drop-down we just created.

DataFlow Pipeline

Let’s now look into creating Dataflow pipeline from PubSub to BigQuery

Go to console.cloud.google.com/dataflow. The landing page looks as below

Click on create Job from template.

Give a desired job name, regional endpoint. From the Data flow template select Pub-Sub to Bigquery Pipeline as below. Give name to the subscription that we created and also the table name in project:dataset:tablename format . You will also need to specify temporary storage location in Google Cloud Storage as shown below.

You can look for more details on table creation in BigQuery @ https://cloud.google.com/bigquery/docs/tables

https://cloud.google.com/bigquery/docs/schemas

You can look for more details on Bucket Storage creation in Cloud Storage @ https://cloud.google.com/storage/docs/creating-buckets

Click on Run Job tab and the Job panel will look like below

Now lets go to Big Query and check if the data is streamed into our table.

NOTE — GCP does not allow to start/stop the dataflow Job. You will have to recreate a Job every-time you want to stop. Make sure you stop the Job because it consumes considerable resources and give you huge bill.

The data is streamed into the table acc8 of dataset liftpdm_2.

Once the data is in BigQuery, you can use for further downstream applications like Visualization, Machine Learning etc, and store the computed data back to BigQuery.

Conclusion

We looked into step to create IoT Core Devices, Registries and associate them with Topic.

How to Create Pub-Sub Topics and Subscription.

How to send messages to PubSub through IoT Python Client.

How to Create Dataflow pipeline from Pub-Sub to BigQuery.

What Next

We will look into how to create closed loop communication back to the device with some actionable parameters.

References

https://cloud.google.com/iot/docs/samples/end-to-end-sample.

https://cloud.google.com/dataflow/docs/guides/templates/provided-streaming.

--

--

Huzaifa Kapasi is Double MS Full time Res. from Warwick University. 15+ Years’ experience in Machine Learning, AI, big data, Cloud, Signal Processing Algorithms