Database

From Raw Data to a Cleaned Database: A Deep Dive into Versatile Data Kit

A complete example using the Versatile Data Kit (a Framework recently released by VMware) and Trino DB

Angelica Lo Duca
Towards Data Science
8 min readFeb 11, 2022

--

Photo by Markus Winkler on Unsplash

Recently, VMware has released a new open-source tool, called Versatile Data Kit (VDK, for short), which permits the management of data very quickly. The tool permits ingesting different formats of data to a single database with few lines of code.

In my previous article, I described a basic example, which uses VDK and also describes how to install it and make it run. To summarize, you should:

  • have a running database (external to VDK)
  • configure in VDK the interface to the database, including username, password, host, and port
  • define in VDK how to ingest data in the database, using SQL queries or more complex procedures implemented in Python.

Once you have ingested data in the database, you can use them as you want, for example, to build interesting dashboards or visualizations.

In this article, I focus on a complete example, which starts from data ingestion, up to data processing. The example uses a Trino Database with a MySQL server to store data and VDK to ingest data in the database.

The following figure shows a possible architecture including VDK, Trino Server, and a MySQL Server:

A possible architecture including VDK, Trino Server and MySQL Server
Image by Author

In detail, the article is organized as follows:

  • Definition of the Scenario
  • Setup of the Database
  • Data Ingestion in VDK
  • Data Processing in VDK

1 Definition of the Scenario

The objective of the scenario is to analyze the life expectancy in the different U.S. Regions and compare them with the Region's Gross Domestic Product (GDP). To achieve this objective, you should download data related to the U.S. life expectancy and merge them with their associated GDP.

This example shows how to extract and integrate data needed for this scenario through VDK.

We can use the following two datasets, both of which are available as CSV files:

The previous datasets are released under the U.S. Government Works license.

In addition to the previous datasets, we can use the following additional datasets:

The previous two datasets are released by the U.S. Bureau of Economic Analysis and the U.S. Census Bureau, respectively, both providing open data under a public license.

The dataset U.S. Life Expectancy at Birth by State and Census Tract — 2010–2015 contains 73,121 records relating to the life expectancy of Americans, divided by state and county and relating to the period 2010–2015.

The following figure shows an excerpt of the dataset:

Image by Author

The dataset U.S. State Life Expectancy by Sex, 2018 contains 156 records relating to the life expectancy of Americans in 2018, divided by state and sex (male, female, total). The following figure shows an excerpt of the dataset:

Image by Author

The dataset U.S. Gross Domestic Product by County contains 3,163 records, related to the U.S. real Gross Domestic Product, by County, referring to the period years 2017–2020. The dataset contains many columns. The following figure shows an excerpt of the dataset:

Image by Author

The dataset U.S. Gross Domestic Product by County contains 3,163 records, related to the U.S. real Gross Domestic Product, by County, referring to the period years 2017–2020. The dataset contains many columns. The following figure shows an excerpt of the dataset:

Image by Author

2 Setup of the Database

We will store all the data in a Trino Database as an interface of a MySQL server. Trino is a distributed open-source SQL query engine for Big Data Analytics. It runs distributed and parallel queries and it is very fast. For the Trino Database setup, you can follow my previous tutorial.

In this example, Trino is running locally, with the following minimal config.properties configuration file:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=http://127.0.0.1:8080
http-server.https.enabled=false

In addition, the Trino DB uses the MySQL catalog, with the following configuration (file mysql.properties located in the catalog folder of the Trino server:

connector.name=mysql
connection-url=jdbc:mysql://localhost:3306
connection-user=root
connection-password=
allow-drop-table=true

Finally, this example assumes that an empty schema called life-expectancy exists on the MySQL server.

3 Data Ingestion in VDK

VDK runs with the following configuration (config.ini):

db_default_type=TRINO
ingest_method_default = trino
trino_catalog = mysql
trino_use_ssl =
trino_host = localhost
trino_port = 8080
trino_user = root
trino_schema = life-expectancy
trino_ssl_verify =

Data Ingestion uploads in the database the two CSV tables, defined in the Data Source section. For each table, data Ingestion is performed through the following steps:

  • delete the existing table (if any)
  • create a new table
  • ingest table values directly from the CSV file.

The first two steps will be written using the SQL language, while the last one will be written in Python.

Each dataset will be ingested in a table with a name similar to the dataset name. For example, The U.S. Life Expectancy at Birth by State and Census Tract — 2010–2015 dataset will be ingested in the life_expectancy_2010_2015 table.

3.1 Delete the existing table

Firstly, I create a script, named 01_delete_table_life_expectancy_2010_2015.sql :

DROP TABLE IF EXISTS life_expectancy_2010_2015

The number 01 preceding the script name indicates that the VDK framework will run it as the first script.

I also create the 02_delete_table_life_expectancy_2018.sql script, which deletes the life_expectancy_2018 table and the other scripts to delete the us_regions and us_gdp tables.

3.2 Create a new table

Now, I create the schema for the new table, and I store it in the 03_create_table_life_expectancy_2010_2015.sql script:

CREATE TABLE life_expectancy_2010_2015 (
State varchar(32),
County varchar(32),
CensusTractNumber varchar(32),
LifeExpectancy decimal(4,2),
LifeExpectancyRange varchar,
LifeExpectancyStandardError decimal(4,2)
)

Similar to the previous script, I create the 04_create_table_life_expectancy_2018.sql:

CREATE TABLE life_expectancy_2018 (
State varchar(32),
Sex varchar(32),
LEB decimal(3,1),
SE decimal(3,1),
Quartile varchar(32)
)

as well as the us_regions and us_gdp tables:

CREATE TABLE us_regions (
State varchar(32),
StateCode varchar(2),
Region varchar(32),
Division varchar(32)
)
CREATE TABLE us_gdp (
County varchar(32),
Year2017 bigint,
Year2018 bigint,
Year2019 bigint,
Year2020 bigint
)

3.2 Ingest table values directly from the CSV file

Finally, I can ingest table values in the life_expectancy_2010_2015 table. I use the IJobInput class provided by the VDK API. I define a run() function, that will be read by the VDK framework, and within it, I write the ingestion code:

import pandas as pd
from vdk.api.job_input import IJobInput
def run(job_input: IJobInput): # Read CSV file url = "http://data.cdc.gov/api/views/5h56-n989/rows.csv"
dtypes = {
"State": str,
"County": str,
"Census Tract Number": str,
"Life Expectancy": np.float64,
"Life Expectancy Range": str,
"Life Expectancy Standard Error": np.float64,
}
df = pd.read_csv(url, dtype=dtypes).replace("'", "''", regex=True)
df.columns = df.columns.str.replace(" ", "")
# Ingest CSV file job_input.send_tabular_data_for_ingestion(
df.itertuples(index=False),
destination_table="life_expectancy_2010_2015",
column_names=df.columns.tolist()
)

To ingest every row of the CSV in the database, I use the send_tabular_data_for_ingestion() method of the IJobInput class.

We can ingest the other datasets with the same technique.

4 Data Processing in VDK

Data Processing includes the following tasks:

  • Clean tables
  • Merge the cleaned tables

4.1 Clean tables

Cleaning the life_expectancy_2010_2015 table includes the following two operations:

  • group records by County
  • split the column LifeExpectancyRange in two decimal columns MinLifeExpectancyRange and MaxLifeExpectancyRange.

The output of the cleaning process for the life_expectancy_2010_2015 table is stored in a new table, called cleaned_life_expectancy_2010_2015.

Both the previous operations can be implemented through a SQL statement:

CREATE TABLE cleaned_life_expectancy_2010_2015 AS(SELECT State,
LifeExpectancy,
cast(split(life_expectancy_2010_2015.LifeExpectancyRange,'-')[1] AS decimal(4,2)) AS MinLifeExpectancyRange,
cast(split(life_expectancy_2010_2015.LifeExpectancyRange,'-')[2] AS decimal(4,2)) AS MaxLifeExpectancyRange,
LifeExpectancyStandardError
FROM life_expectancy_2010_2015
WHERE County = '(blank)'
)

In the dataset, all the rows with County = '(blank)' contain the total value of life expectancy for a given County. Thus, I can easily group by County simply by selecting those rows.

The following Figure shows the resulting table:

Image by Author

Now, I clean the life_expectancy_2018 table. Cleaning the life_expectancy_2018 table includes the following operations:

  • rename column LEB to LifeExpectancy
  • rename column SE to LifeExpectancyStandardError
  • split the column Quartile in two decimal columns MinLifeExpectancyRange and MaxLifeExpectancyRange
  • select only rows with Sex = 'Total'.

All the previous operations can be implemented through a SQL statement:

CREATE TABLE cleaned_life_expectancy_2018 AS(SELECT State,
LEB AS LifeExpectancy,
cast(split(life_expectancy_2018.Quartile,' - ')[1] AS decimal(4,2)) AS MinLifeExpectancyRange,
cast(split(life_expectancy_2018.Quartile,' - ')[2] AS decimal(4,2)) AS MaxLifeExpectancyRange,
SE AS LifeExpectancyStandardError
FROM life_expectancy_2018
WHERE Sex = 'Total' and State <> 'United States'
)

The following table shows an example of the cleaned_life_expectancy_2018 table:

Image by Author

Note that, after cleaning, the cleaned_life_expectancy_2010_2015 and cleaned_life_expectancy_2018 tables have the same schema.

4.2 Merge the cleaned tables

Finally, I’m ready to merge all the tables. I perform the following operations:

  • vertical merging between the cleaned_life_expectancy_2010_2015 and cleaned_life_expectancy_2018 tables;
  • horizontal merging between the resulting table and the us_regions and us_gdp tables.

Vertical merging means that the second dataset is appended to the first dataset, while horizontal merging adds three new columns, called Period, GDP and Region, to the resulting table, that is named merged_life_expectancy. The GDP attribute is set only for records with Period = '2018'. For the other records, it is set to 0, since it is not available.

CREATE TABLE merged_life_expectancy AS(SELECT us_regions.State,
LifeExpectancy,
MinLifeExpectancyRange,
MaxLifeExpectancyRange,
'2010-2015' AS Period,
Region,
0 AS GDP
FROM cleaned_life_expectancy_2010_2015
JOIN us_regions
ON us_regions.State = cleaned_life_expectancy_2010_2015.State
)
UNION(SELECT us_regions.State,
LifeExpectancy,
MinLifeExpectancyRange,
MaxLifeExpectancyRange,
'2018' AS Period,
Region,
Year2018 AS GDP
FROM cleaned_life_expectancy_2018
JOIN us_regions
ON us_regions.State = cleaned_life_expectancy_2018.State
INNER JOIN us_gdp
ON us_gdp.County = cleaned_life_expectancy_2018.State
WHERE Year2018 > 100000000
)

In the second SELECT statement, the WHERE condition specifies Year2018 > 100000000. This permits to select only Counties.

The final table is shown in the following table:

Image by Author

Summary

Congratulations! You have just learned how to ingest and process data in VDK! This can be achieved by writing simple scripts in SQL and Python.

The full code of this example is available here.

The next step is to build a report that shows the results of processing. Stay tuned to learn how to do it :)

For questions or doubts on Versatile Data Kit, you can join directly their public slack workspace or their mailing list or follow them on Twitter.

If you have read this far, for me it is already a lot for today. Thanks! You can read more about me in this article.

Related Articles

--

--

Researcher | +50k monthly views | I write on Data Science, Python, Tutorials, and, occasionally, Web Applications | Book Author of Comet for Data Science