How to Solve the Real Big Data Problems - Load Huge Excel Files

Hint: Multi-threading. Using PDI and PostgreSQL, we can load a huge excel file in a matter of few minutes.

Shravankumar Suvarna
Towards Data Science

--

More and more companies have started to realize the importance of data. Hence, they come with requests to load huge CSV or Excel files from their legacy systems or manual processes to a database for data-driven analytics. I know, we now have a lot of solutions to solve this problem like pandas, dask, vaex python libraries or tools like Informatica etc.

However, it’s always fun to learn different approaches to solve a problem statement. We will be using PDI to solve this problem statement and use PostgreSQL as our database. The idea here will be to optimally utilize our system capabilities. I know not all of us have the perk of memory-optimized servers at our disposal.

Photo by ev on Unsplash

If you are new to data pipeline building process, then I will recommend you to go through the below story.

Prerequisite

  • PDI: Pentaho Data Integration installed on your system. You can use the link for a step-by-step installation guide.
  • PostgreSQL: We can use any relational or non-relational database as per our preference. If you want to follow along, then please use the link to install the same.

User Stories

I like to define the user stories for our problem statement first. This helps me design the high-level architecture for the data pipeline. We need to always break the problem in small easy individual pieces.

  1. I want to read huge a CSV file with economic data containing millions of records.
  2. I want to perform a lookup for each content in the data with a dimension/master table.
  3. I want to clean the data and remove NULL fields.
  4. I want to add a row-level condition. I want to append ‘_R’ if the status column contains the word ‘Revised’.
  5. I want to load the same in PostgreSQL database.

We are trying to replicate the real-world scenario by adding a little complexity of data manipulation as well.

Input data

It is a good practice to understand the input data files. Now, in our case, it might be difficult to open huge CSV files and check the columns and rows. However, there are methods by which we can determine or check sample data. PDI provides you to read sample data and check other metadata by creating a small transformation.

Here, I Googled the term ‘huge data file in csv’ and downloaded the file from the first website. Here’s the link.

Now, I wanted to crash the system and create huge file; like we are dealing with Big Data, right? The downloaded file had 66,526 records, so I appended the same records multiple times to create a huge file with around 11,974,697 records; yeah not that big.

Test Cases

Defining test cases are important here since we cannot manually check the entire data. We need to make sure, we check good enough sample data to cross-validate the accuracy.

  1. Check the row count and match with input data. Please note, since we will be removing the NULL data. It is important to store those NULL records as well.
  2. Cross-validate the dimensions output results for at least 500 -1000 records.
  3. Cross-validate the calculation randomly to check for accuracy; again for at least a thousand records.

Step-1: Set-up for the project

We have a pretty simple project set up for this project. Just one directory and one transformation file.

I prefer to create all the work-related projects in one single project directory named ‘Work’; I know, how creative! We need to perform the below; you can skip this step.

  1. Create our project directory - LoadData.
  2. Create a directory ‘Input’ within the project directory.
  3. Create an empty Transformation named ‘Main.ktr’ within the project directory.
Project Directory Structure Screenshot
Project Directory Structure

If you are not aware of the words like transformations or job, then I will recommend the below-mentioned story.

Step-2: Create Database Table

I am assuming that you have the database already installed here. We are using PostgreSQL.

Now, I prefer to create tables using Django Models. You don’t necessarily have to use this methodology.

Having said so, it makes our life easy by writing Django models instead of manually creating tables and column natively. Django models does that for us using the simple migrations command and also get CRUD (create, read, update and delete) functionality out of the box.

You can choose below mentioned two options to create the database and table. I have create a table medium_db

  • PostgreSQL create script.
 — Table: public.economic_data — DROP TABLE public.economic_data;CREATE TABLE public.economic_data
(
id integer NOT NULL DEFAULT nextval(‘economic_data_id_seq’::regclass),series_reference character varying(255) COLLATE pg_catalog.”default” NOT NULL,indicator_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,period character varying(45) COLLATE pg_catalog.”default” NOT NULL,indicator_value numeric(30,10) NOT NULL,status character varying(255) COLLATE pg_catalog.”default” NOT NULL,
indicator_unit character varying(255) COLLATE pg_catalog.”default” NOT NULL,
group_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,series_name character varying(255) COLLATE pg_catalog.”default”,
CONSTRAINT economic_data_pkey PRIMARY KEY (id)
)
TABLESPACE pg_default;ALTER TABLE public.economic_data
OWNER to YOURUSER;
  • Django Model script to run migrations.
from django.db import models# Create your models here.class EconomicData(models.Model):series_reference = models.CharField(db_column="series_reference",max_length=255,help_text="Unique code to identify a particular record",verbose_name="Series Reference",)indicator_name = models.CharField(db_column="indicator_name",max_length=255,verbose_name="Name of the indicators")period = models.CharField(db_column="period",max_length=45,verbose_name="Period")indicator_value = models.DecimalField(db_column="indicator_value",max_digits=30,decimal_places=10,verbose_name="Value of the Field")status = models.CharField(db_column="status",max_length=255,verbose_name="Status of the value For eg, Final or Revised")indicator_unit = models.CharField(db_column="indicator_unit",max_length=255,verbose_name="Unit of the indicators")group_name = models.CharField(db_column="group_name",max_length=255,verbose_name="Group of the indicators")series_name = models.CharField(db_column="series_name",max_length=255,verbose_name="Series of the indicators"null=True)def __str__(self):return f"{self.indicator_name} - {self.value}"class Meta:db_table = "economic_data"verbose_name = "Economic Data"verbose_name_plural = "Economic Data"

If you are interested in understanding how we can benefit from Django in our data pipeline, then please do let me know the same in the response section below.

We have our table ready. Now, let create our transformation.

Step-3: Loader Transformation

We need to create a loader transformation which read our input CSV, perform manipulation and load the data onto the database.

We need to open our Main.ktr file and drag some plugin as mentioned below.

Step-1: Drag Steps

  1. Firstly, let’s add a small description about the transformation. Documentation is a key for any data pipeline.
  2. Drag ‘CSV file input’, ‘Data grid’, ‘Join rows (cartesian product)’, ‘User defined Java expression’, ‘Filter rows’, ‘Text file output’, ‘Table Output’ plugins from the design tab onto the canvas.
  3. Rename the fields as per our naming convention.
Main transformation screenshot
Main transformation after renaming

Step-2: Configure Properties

  1. We need to configure the properties for each of the above-mentioned steps. Let configure for our CSV input step, we need to browse for our input file in the Filename field and click on Get Fields. We can tweak the NIO buffer size as per our memory availability; it will process the files in batches of 50K records each.
  2. We need to add data in Data Grid (Replicating a table here). Here, we are using a data grid for example data. In a real-world scenario, you will get this data from some dimension table. We are standardizing the Group Names. You can refer the below screenshot for the data. We need to add column names in the Meta tab and actual data in the Data tab.
  3. In the Join rows step, we need to map the fields that we want from the input with our dimension table/grid. Since we are mapping groups here, we need to add the Condition to map the same.
  4. In the User defined java expression, we will configure the custom row-level condition. We need to define our New field as ‘series_reference_flag’, here we want to change the ‘Series_reference’ field and append ‘_R’ if the status column is ‘Revised’. In our Java expression, we will add the following condition - ‘status == “Revised”? Series_reference + “_R” : Series_reference’; this is java code. We can perform similar conditions or calculations, Powerful! Lastly, we need to add Value type to ‘String’.
  5. In the Filter rows step, we need to define our condition of passing records without null values.
  6. In the Text file output (error report), we need to add Filename as${Internal.Entry.Current.Directory}/error_reportand change the Extension to ‘CSV’.
  7. In Table output step, we need to create a new Connection to connect to our database. We can connect to any database as per our requirements. We here will connect to PostgreSQL; refer screenshot for connection details. We need to browse for the Target table to ‘economic_data’. We need to check the Specify database fields field. We then need to map the input/transformation fields to table fields.
Input Step Configurations Screenshot
Input Step Configurations
Data Grid Configurations — Dimension Screenshot
Data Grid Configurations - Dimension
Join rows Configurations Screenshot
Join rows Configurations
Condition Configurations Screenshot
Condition Configurations
Filtering Null Configurations Screenshot
Filtering Null Configurations
Database Output Configurations Screenshot
Database Output Configurations
Database Mapping Screenshot
Database Mapping

Step-4: Let’s Speed-up Process

Now that we have configured the properties, we can speed the process by creating multiple threads for inserting data. This will boost the performance. PDI provides us with the facility to configure multi-threading per steps. If we use it in an input step, it will multiply the records. Whereas, if we use it for output steps like database, it will distribute the records.

PDI provides us with many options to optimize the performance. We can perform the below steps to boost the performance.

  1. Change the NIO buffer size in our in input step, define our batch size.
  2. Change Max. cache size (in rows) in lookup step, define the number of rows it will store in cache memory instead of querying to the database.
  3. Change Commit Size, similar to buffer size change the batch size to insert records.
  4. Use multiple threads to perform the activity, we can add a Dummy step and right-click to select Change Number of Copies to Start from 1 to anything above 1 as per our requirements before our output step.
  5. Please note, if we want to perform multiple threads on table output step, then we cannot use point number four. We will then have to add a Dummy step before output and distribute the records in multiple output table steps.

Wow! we have so many options, should we change all the performance optimizer? Short answer is NO. We need to try with sample data and perform multiple tests on what works best for us.

Step-2: Evaluation

Let’s run the flow without performance optimizer and then compare it by applying the optimizer.

Success
Step Matrix - It took 8m 42s
Added simple optimizer — 20X on Condition and Two Threads for Output
Step Matrix — It took 4m 35s

We reduced the time taken by almost 50% to perform the same activity by adding some simple performance optimizer.

If we want to read multiple files and create a loop to process the same, then I will recommend you to go through the below story.

Conclusion

We took a problem statement and tried solving it using multiple approaches and tried optimizing it as well. In theory, you can apply this process to fit your requirement and try to optimize it further as well. PDI provides us with PostgreSQL bulk loader step as well; I have tried that step as well. However, there was not any significant performance booster provided by the same.

We cannot optimize the code/pipeline at the beginning and will have to perform multiple tests to get the ideal results. However, to shorten the learning curve you can always read through my experiences and find solutions to the problem statement by subscribing to my email list using the below link.

See you in the next post. Happy ETL

--

--