The world’s leading publication for data science, AI, and ML professionals.

Compare PySpark DataFrames based on Grain

A simple approach to compare Pyspark DataFrames based on grain and to generated reports with data samples

Photo by Myriam Jessier on Unsplash
Photo by Myriam Jessier on Unsplash

Comparing two datasets and generating accurate meaningful insights is a common and important task in the BigData world. By running parallel jobs in Pyspark we can efficiently compare huge datasets based on grain.) and generate efficient reports to pinpoint the difference at each column level.


Requirement:

We have a huge dataset ingested from Salesforce. The dataset has more than 600 columns and around 20 million rows. Dev and validation datasets have to be compared to generate a report that can pinpoint the difference at the column level based on grain.).

Below are the insights to be generated along with data samples containing grain columns, that can be used to query and analyze it further.

  • Duplicate records.
  • Missing records.
  • Capture the difference between the column value in the dev and validation dataset.

Approach:

We can use the subtract method of the data frame to subtract two data frames that will generate the records that are not matching, but it would be difficult to identify which out of 600 columns is actually changing and the actual value that is different. Below is a simple approach to tackle this.

From here on ill be referring validation dataset as the source data frame (src_df) and the dev dataset as the target data frame (tar_df). Since the same two data frames are being used again and again for different analytical queries, we can persist the two data frames in memory for faster analysis.

In the below approach it is assumed that both the data frames are having the same schema, i.e both data frames have the same number of columns, same column names, and same data type for all the columns.

Identify duplicate and missing grain records:

To identify duplicate records, we can write a small group by query using Pyspark functions. The grainDuplicateCheck() function does the same.

To identify missing grains, we can select grain columns from both data frames and subtract them directly.

It’s important to note that we need to perform both src_df – tar_df which would get us the valid missing grains and tar_df – src_df which would get us the grain of extra records that are invalid.

Compare column values:

Now we have to compare the column values between the data frames and generate the report column by column, with both expected value and invalid value found for a particular grain.

Since we have already identified the missing records, now we shall join the two data frames on the grain columns and compare the column values for all the records which have matching grain in both data frames.

It is important to note that

  • Grain columns can have null values and we have to perform null safe join otherwise, those records won’t join and the records will be skipped. I’m using the eqNullSafe() built-in Pyspark function for this purpose.
  • The columns that contain double values like amount, will not match exactly as different calculations might generate values with a small difference in decimal precision. Hence for these types of columns, we have to subtract the values and flag those records which has a difference ≥ threshold.

In the below compareDf() function, using the joined data frame, we select grain and specific column to be validated and compare them.

Compare column values by running parallel jobs:

The limitation of the above function is that we are comparing the two data frames column by column sequentially, iterating over each column of the data frame.

Let’s look at some stats now. With 600+ columns and around 20 million rows, it is taking 15 seconds for each column to be compared. For all 600 columns, it comes around 150 minutes (~2.5hrs).

The column comparison tasks are independent of each other and hence we can parallelize them. If we compare 8 columns in parallel then the time taken would be around 19 minutes. This is a huge difference as it’s 8x times faster.

In the below function compareDfParallel(), I’m creating a thread pool of specified size using multiprocessing built-in python package. With _parallel_threadcount = 8, 8 columns will be compared parallelly. compareDFColumn() is the actual comparison function that will be executed by each thread for a specific column.

Parallel thread count has to be chosen based on the capacity of the nodes in the cluster. We can use the number of driver cores specified in spark-submit as the parallel thread count.


Assuming all the above functions are present in the pyspark_utils.py file, below is an example of the usage with the sample data.

The output of compareDfParallel() will have the columns with prefix _src for the column value referring to the source data frame and _tar for the column value referring to the target data frame.

In the above example, _billing_amountsrc is 1100 which is the value of billing_amount in the source data frame, whereas _billing_amounttar is 1000 which is the value of billing_amount in the target data frame.

The output of the script can be piped to a file to get a consolidated report. The function can be modified to generate an HTML report instead of a text file report.


This is one of the simple approaches to efficiently compare Pyspark data frames based on grain and to generate reports with data samples.

Happy Datawarehousing!


Related Articles