Database
From Raw Data to a Cleaned Database: A Deep Dive into Versatile Data Kit
A complete example using the Versatile Data Kit (a Framework recently released by VMware) and Trino DB
Recently, VMware has released a new open-source tool, called Versatile Data Kit (VDK, for short), which permits the management of data very quickly. The tool permits ingesting different formats of data to a single database with few lines of code.
In my previous article, I described a basic example, which uses VDK and also describes how to install it and make it run. To summarize, you should:
- have a running database (external to VDK)
- configure in VDK the interface to the database, including username, password, host, and port
- define in VDK how to ingest data in the database, using SQL queries or more complex procedures implemented in Python.
Once you have ingested data in the database, you can use them as you want, for example, to build interesting dashboards or visualizations.
In this article, I focus on a complete example, which starts from data ingestion, up to data processing. The example uses a Trino Database with a MySQL server to store data and VDK to ingest data in the database.
The following figure shows a possible architecture including VDK, Trino Server, and a MySQL Server:
In detail, the article is organized as follows:
- Definition of the Scenario
- Setup of the Database
- Data Ingestion in VDK
- Data Processing in VDK
1 Definition of the Scenario
The objective of the scenario is to analyze the life expectancy in the different U.S. Regions and compare them with the Region's Gross Domestic Product (GDP). To achieve this objective, you should download data related to the U.S. life expectancy and merge them with their associated GDP.
This example shows how to extract and integrate data needed for this scenario through VDK.
We can use the following two datasets, both of which are available as CSV files:
- U.S. Life Expectancy at Birth by State and Census Tract — 2010–2015
- U.S. State Life Expectancy by Sex, 2018
The previous datasets are released under the U.S. Government Works license.
In addition to the previous datasets, we can use the following additional datasets:
- U.S. Gross Domestic Product by County
- U.S. Census Bureau Regions and Divisions (extracted from this official link)
The previous two datasets are released by the U.S. Bureau of Economic Analysis and the U.S. Census Bureau, respectively, both providing open data under a public license.
The dataset U.S. Life Expectancy at Birth by State and Census Tract — 2010–2015 contains 73,121 records relating to the life expectancy of Americans, divided by state and county and relating to the period 2010–2015.
The following figure shows an excerpt of the dataset:
The dataset U.S. State Life Expectancy by Sex, 2018 contains 156 records relating to the life expectancy of Americans in 2018, divided by state and sex (male, female, total). The following figure shows an excerpt of the dataset:
The dataset U.S. Gross Domestic Product by County contains 3,163 records, related to the U.S. real Gross Domestic Product, by County, referring to the period years 2017–2020. The dataset contains many columns. The following figure shows an excerpt of the dataset:
The dataset U.S. Gross Domestic Product by County contains 3,163 records, related to the U.S. real Gross Domestic Product, by County, referring to the period years 2017–2020. The dataset contains many columns. The following figure shows an excerpt of the dataset:
2 Setup of the Database
We will store all the data in a Trino Database as an interface of a MySQL server. Trino is a distributed open-source SQL query engine for Big Data Analytics. It runs distributed and parallel queries and it is very fast. For the Trino Database setup, you can follow my previous tutorial.
In this example, Trino is running locally, with the following minimal config.properties
configuration file:
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=http://127.0.0.1:8080
http-server.https.enabled=false
In addition, the Trino DB uses the MySQL catalog, with the following configuration (file mysql.properties
located in the catalog folder of the Trino server:
connector.name=mysql
connection-url=jdbc:mysql://localhost:3306
connection-user=root
connection-password=
allow-drop-table=true
Finally, this example assumes that an empty schema called life-expectancy
exists on the MySQL server.
3 Data Ingestion in VDK
VDK runs with the following configuration (config.ini
):
db_default_type=TRINO
ingest_method_default = trino
trino_catalog = mysql
trino_use_ssl =
trino_host = localhost
trino_port = 8080
trino_user = root
trino_schema = life-expectancy
trino_ssl_verify =
Data Ingestion uploads in the database the two CSV tables, defined in the Data Source section. For each table, data Ingestion is performed through the following steps:
- delete the existing table (if any)
- create a new table
- ingest table values directly from the CSV file.
The first two steps will be written using the SQL language, while the last one will be written in Python.
Each dataset will be ingested in a table with a name similar to the dataset name. For example, The U.S. Life Expectancy at Birth by State and Census Tract — 2010–2015 dataset will be ingested in the life_expectancy_2010_2015
table.
3.1 Delete the existing table
Firstly, I create a script, named 01_delete_table_life_expectancy_2010_2015.sql
:
DROP TABLE IF EXISTS life_expectancy_2010_2015
The number 01
preceding the script name indicates that the VDK framework will run it as the first script.
I also create the 02_delete_table_life_expectancy_2018.sql
script, which deletes the life_expectancy_2018
table and the other scripts to delete the us_regions
and us_gdp
tables.
3.2 Create a new table
Now, I create the schema for the new table, and I store it in the 03_create_table_life_expectancy_2010_2015.sql
script:
CREATE TABLE life_expectancy_2010_2015 (
State varchar(32),
County varchar(32),
CensusTractNumber varchar(32),
LifeExpectancy decimal(4,2),
LifeExpectancyRange varchar,
LifeExpectancyStandardError decimal(4,2)
)
Similar to the previous script, I create the 04_create_table_life_expectancy_2018.sql
:
CREATE TABLE life_expectancy_2018 (
State varchar(32),
Sex varchar(32),
LEB decimal(3,1),
SE decimal(3,1),
Quartile varchar(32)
)
as well as the us_regions
and us_gdp
tables:
CREATE TABLE us_regions (
State varchar(32),
StateCode varchar(2),
Region varchar(32),
Division varchar(32)
)CREATE TABLE us_gdp (
County varchar(32),
Year2017 bigint,
Year2018 bigint,
Year2019 bigint,
Year2020 bigint
)
3.2 Ingest table values directly from the CSV file
Finally, I can ingest table values in the life_expectancy_2010_2015
table. I use the IJobInput
class provided by the VDK API. I define a run()
function, that will be read by the VDK framework, and within it, I write the ingestion code:
import pandas as pd
from vdk.api.job_input import IJobInputdef run(job_input: IJobInput): # Read CSV file url = "http://data.cdc.gov/api/views/5h56-n989/rows.csv"
dtypes = {
"State": str,
"County": str,
"Census Tract Number": str,
"Life Expectancy": np.float64,
"Life Expectancy Range": str,
"Life Expectancy Standard Error": np.float64,
} df = pd.read_csv(url, dtype=dtypes).replace("'", "''", regex=True)
df.columns = df.columns.str.replace(" ", "") # Ingest CSV file job_input.send_tabular_data_for_ingestion(
df.itertuples(index=False),
destination_table="life_expectancy_2010_2015",
column_names=df.columns.tolist()
)
To ingest every row of the CSV in the database, I use the send_tabular_data_for_ingestion()
method of the IJobInput
class.
We can ingest the other datasets with the same technique.
4 Data Processing in VDK
Data Processing includes the following tasks:
- Clean tables
- Merge the cleaned tables
4.1 Clean tables
Cleaning the life_expectancy_2010_2015
table includes the following two operations:
- group records by County
- split the column
LifeExpectancyRange
in two decimal columnsMinLifeExpectancyRange
andMaxLifeExpectancyRange
.
The output of the cleaning process for the life_expectancy_2010_2015
table is stored in a new table, called cleaned_life_expectancy_2010_2015
.
Both the previous operations can be implemented through a SQL statement:
CREATE TABLE cleaned_life_expectancy_2010_2015 AS(SELECT State,
LifeExpectancy,
cast(split(life_expectancy_2010_2015.LifeExpectancyRange,'-')[1] AS decimal(4,2)) AS MinLifeExpectancyRange,
cast(split(life_expectancy_2010_2015.LifeExpectancyRange,'-')[2] AS decimal(4,2)) AS MaxLifeExpectancyRange,
LifeExpectancyStandardError
FROM life_expectancy_2010_2015
WHERE County = '(blank)'
)
In the dataset, all the rows with County = '(blank)'
contain the total value of life expectancy for a given County. Thus, I can easily group by County simply by selecting those rows.
The following Figure shows the resulting table:
Now, I clean the life_expectancy_2018
table. Cleaning the life_expectancy_2018
table includes the following operations:
- rename column
LEB
toLifeExpectancy
- rename column
SE
toLifeExpectancyStandardError
- split the column
Quartile
in two decimal columnsMinLifeExpectancyRange
andMaxLifeExpectancyRange
- select only rows with
Sex = 'Total'
.
All the previous operations can be implemented through a SQL statement:
CREATE TABLE cleaned_life_expectancy_2018 AS(SELECT State,
LEB AS LifeExpectancy,
cast(split(life_expectancy_2018.Quartile,' - ')[1] AS decimal(4,2)) AS MinLifeExpectancyRange,
cast(split(life_expectancy_2018.Quartile,' - ')[2] AS decimal(4,2)) AS MaxLifeExpectancyRange,
SE AS LifeExpectancyStandardError FROM life_expectancy_2018
WHERE Sex = 'Total' and State <> 'United States'
)
The following table shows an example of the cleaned_life_expectancy_2018
table:
Note that, after cleaning, the cleaned_life_expectancy_2010_2015
and cleaned_life_expectancy_2018
tables have the same schema.
4.2 Merge the cleaned tables
Finally, I’m ready to merge all the tables. I perform the following operations:
- vertical merging between the
cleaned_life_expectancy_2010_2015
andcleaned_life_expectancy_2018
tables; - horizontal merging between the resulting table and the
us_regions
andus_gdp
tables.
Vertical merging means that the second dataset is appended to the first dataset, while horizontal merging adds three new columns, called Period
, GDP
and Region
, to the resulting table, that is named merged_life_expectancy
. The GDP
attribute is set only for records with Period = '2018'
. For the other records, it is set to 0
, since it is not available.
CREATE TABLE merged_life_expectancy AS(SELECT us_regions.State,
LifeExpectancy,
MinLifeExpectancyRange,
MaxLifeExpectancyRange,
'2010-2015' AS Period,
Region,
0 AS GDP
FROM cleaned_life_expectancy_2010_2015
JOIN us_regions
ON us_regions.State = cleaned_life_expectancy_2010_2015.State
)UNION(SELECT us_regions.State,
LifeExpectancy,
MinLifeExpectancyRange,
MaxLifeExpectancyRange,
'2018' AS Period,
Region,
Year2018 AS GDP
FROM cleaned_life_expectancy_2018
JOIN us_regions
ON us_regions.State = cleaned_life_expectancy_2018.State
INNER JOIN us_gdp
ON us_gdp.County = cleaned_life_expectancy_2018.State
WHERE Year2018 > 100000000
)
In the second SELECT
statement, the WHERE
condition specifies Year2018 > 100000000
. This permits to select only Counties.
The final table is shown in the following table:
Summary
Congratulations! You have just learned how to ingest and process data in VDK! This can be achieved by writing simple scripts in SQL and Python.
The full code of this example is available here.
The next step is to build a report that shows the results of processing. Stay tuned to learn how to do it :)
For questions or doubts on Versatile Data Kit, you can join directly their public slack workspace or their mailing list or follow them on Twitter.
If you have read this far, for me it is already a lot for today. Thanks! You can read more about me in this article.