Processing a Slowly Changing Dimension Type 2 Using PySpark in AWS

Ryan Quigley
Towards Data Science
12 min readApr 17, 2019

--

With the emergence of new technologies that make data processing lightening fast, and cloud ecosystems which allow for flexibility, cost savings, security, and convenience, there appear to be some data modeling philosophies that are being used less frequently. One of these approaches is the star schema data architecture.

In a nutshell (I’m assuming those reading this are familiar with the concept), a star schema stores data in a dimensional fashion in order to afford a better user experience by providing faster and simpler query performance. In a dimensional model, data resides in a fact table or dimension table. A fact table holds measurements for an action and keys to related dimensions, and a dimension contains attributes for said action. A dimension can be static (such as one for time) or can save history (AKA slowly changing dimension type 2 AKA SCD2). It is important to note that star schemata are analytical systems, meaning they do not usually consume data directly from a user application. Rather, the data is stored in a highly normalized transactional system from where the star schema grabs the data utilizing some extract procedure.

Below is an example of a basic star schema for a sales program with one fact table and three dimensions (dim_customer_scd being an SCD2).

Star schemata are normally processed at the RDBMS layer, where UPDATEs and DELETEs (not discussed here) are allowed, however processing time can be an issue when fact and SCD2 tables contain millions or billions of rows. In this day and age, it is common to turn our thoughts to such distributed cluster frameworks as Spark or Hadoop. One can banter and postulate all day long as to which is the preferred framework, but with that not being the subject of this discussion, I will explain how to process an SCD2 using Spark as the framework and PySpark as the scripting language in an AWS environment, with a heavy dose of SparkSQL.

At its most basic, the purpose of an SCD2 is to preserve history of changes. If a customer changes their last name or address, an SCD2 would allow users to link orders back to the customer and their attributes in the state they were at the time of the order. If I didn’t retain that history, it would be difficult to do so. Essentially, I can find the true state of a customer at any point in time.

dim_customer_scd (SCD2)

The dataset is very narrow, consisting of 12 columns. I can break those columns up in to 3 sub-groups.

  • Keys: customer_dim_key
  • Non-dimensional Attributes: first_name, last_name, middle_initial, address, city, state, zip_code, customer_number
  • Row Metadata: eff_start_date, eff_end_date, is_current

Keys are usually created automatically and have no business value. They’re simply there to use as foreign keys to other tables and provide uniqueness to rows. Natural keys can sometimes be used, but are highly uncommon in a star schema.

Non-dimensional Attributes describe different aspects of the dataset’s subject, which is a customer in the example used. These attributes likely have business meaning and are used further downstream in reports and analyses. I will refer to these as simply “attributes” going forward.

Row metadata columns are specific to SCD2s and describe the state of the record. eff_start_date stores the date the record takes effect. eff_end_date stores the date when a record expires (note: an unexpired record normally has a date in the far flung future, like ‘9999–12–31’); is_current denotes whether the record is the most current one. is_current is redundant with the effective dates, yet some people enjoy this flag in order to avoid using date logic in their queries or scripts.

I have four rows in this dataset. If I look at customer_number I can see that all the rows are comprised of three distinct customers: John, Susan, and William. John and William (rows 1 and 4, respectively) only have one entry in the table, meaning that their data has not changed since being inserted.

Susan, on the other hand, has two records (rows 2 and 3). Row 2 is a historical record, denoted by is_current = false, while row 3 is Susan’s most current information since is_current = true. Row 2 has a value of ‘Jones’ in the last_name column and row 3 contains ‘Harris’, while all the other attribute fields remain the same.

When Susan’s last name was Jones, there were only three rows in the table, and a fourth row was added when her last name changed. What was the actual logic to effectuate this in the table? At a high level:

  1. A change of last name in the source system was identified.
  2. A record was expired; eff_end_date updated to yesterday’s date and is_current set to false.
  3. A new record is inserted with the new last name, an eff_start_date of today, and eff_end_date in the future, and is_current set to true.

Note: it is often better to use logical business dates for the eff_start_date and/or eff_end_date fields, but lets keep it simple for the purpose of this discussion

In an RDBMS, it would be easy to implement this. However, due to the various limitations on UPDATE capability in Spark, I have to do things differently.

Time to get to the details.

Step 1: Create the Spark session

I can go ahead and start our Spark session and create a variable for our target path in S3:

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("scd2_demo").getOrCreate()v_s3_path = "s3://mybucket/dim_customer_scd"

Step 2: Create SCD2 dataset (for demo purposes)

You can use the following script to generate your current SCD2, write it to Parquet, create a temp table, and view the results (I will use this pattern throughout to aid in description):

# ############## generate current_scd2 dataset ############## #hd_current_scd2 = """
SELECT BIGINT(1) AS customer_dim_key,
STRING('John') AS first_name,
STRING('Smith') AS last_name,
STRING('G') AS middle_initial,
STRING('123 Main Street') AS address,
STRING('Springville') AS city,
STRING('VT') AS state,
STRING('01234-5678') AS zip_code,
BIGINT(289374) AS customer_number,
DATE('2014-01-01') AS eff_start_date,
DATE('9999-12-31') AS eff_end_date,
BOOLEAN(1) AS is_current
UNION
SELECT BIGINT(2) AS customer_dim_key,
STRING('Susan') AS first_name,
STRING('Jones') AS last_name,
STRING('L') AS middle_initial,
STRING('987 Central Avenue') AS address,
STRING('Central City') AS city,
STRING('MO') AS state,
STRING('49257-2657') AS zip_code,
BIGINT(862447) AS customer_number,
DATE('2015-03-23') AS eff_start_date,
DATE('2018-11-17') AS eff_end_date,
BOOLEAN(0) AS is_current
UNION
SELECT BIGINT(3) AS customer_dim_key,
STRING('Susan') AS first_name,
STRING('Harris') AS last_name,
STRING('L') AS middle_initial,
STRING('987 Central Avenue') AS address,
STRING('Central City') AS city,
STRING('MO') AS state,
STRING('49257-2657') AS zip_code,
BIGINT(862447) AS customer_number,
DATE('2018-11-18') AS eff_start_date,
DATE('9999-12-31') AS eff_end_date,
BOOLEAN(1) AS is_current
UNION
SELECT BIGINT(4) AS customer_dim_key,
STRING('William') AS first_name,
STRING('Chase') AS last_name,
STRING('X') AS middle_initial,
STRING('57895 Sharp Way') AS address,
STRING('Oldtown') AS city,
STRING('CA') AS state,
STRING('98554-1285') AS zip_code,
BIGINT(31568) AS customer_number,
DATE('2018-12-07') AS eff_start_date,
DATE('9999-12-31') AS eff_end_date,
BOOLEAN(1) AS is_current
"""
df_current_scd2 = spark.sql(hd_current_scd2)df_current_scd2.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/current_scd2/")df_current_scd2.createOrReplaceTempView("current_scd2")# ############## review dataset ############## #df_current_scd2 = spark.read.parquet(v_s3_path + "/current_scd2/*").orderBy("customer_dim_key")df_current_scd2.show(10, False)

Step 3: Create customer dataset from source system (for demo purposes)

You can use the following script to generate your source data, which I will use to modify our SCD2:

# ############## generate customer_data dataset ############## #hd_customer_data = """
SELECT BIGINT(289374) AS customer_number,
STRING('John') AS first_name,
STRING('Smith') AS last_name,
STRING('G') AS middle_initial,
STRING('456 Derry Court') AS address,
STRING('Springville') AS city,
STRING('VT') AS state,
STRING('01234-5678') AS zip_code
UNION
SELECT BIGINT(932574) AS customer_number,
STRING('Lisa') AS first_name,
STRING('Cohen') AS last_name,
STRING('S') AS middle_initial,
STRING('69846 Mason Road') AS address,
STRING('Atlanta') AS city,
STRING('GA') AS state,
STRING('26584-3591') AS zip_code
UNION
SELECT BIGINT(862447) AS customer_number,
STRING('Susan') AS first_name,
STRING('Harris') AS last_name,
STRING('L') AS middle_initial,
STRING('987 Central Avenue') AS address,
STRING('Central City') AS city,
STRING('MO') AS state,
STRING('49257-2657') AS zip_code
UNION
SELECT BIGINT(31568) AS customer_number,
STRING('William') AS first_name,
STRING('Chase') AS last_name,
STRING('X') AS middle_initial,
STRING('57895 Sharp Way') AS address,
STRING('Oldtown') AS city,
STRING('CA') AS state,
STRING('98554-1285') AS zip_code
"""
df_customer_data= spark.sql(hd_customer_data)df_customer_data.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/customer_data/")df_customer_data.createOrReplaceTempView("customer_data")# ############## review dataset ############## #df_customer_data= spark.read.parquet(v_s3_path + "/customer_data/*").orderBy("customer_number")df_customer_data.show(10, False)
Current State of the SCD2
Customer_Data from Source System

Step 4: Manually find changes (solely for the purposes of the topic)

Remember that the data from the source system feeds in to our SCD2, so I need to compare the two datasets to determine if there are any differences. After our manual survey, I see:

  • John Smith has changed their address
  • Lisa Cohen is a new customer
  • Attributes for William Chase and Susan Harris are unchanged

Now I need to write some logic which accomplishes all of the following:

  • Creates the new entry for John Smith
  • Expires the current entry for John Smith
  • Keeps William Chase and Susan Harris’ records unchanged
  • Adds an entry for our new customer, Lisa Cohen

Step 5: Create new current records for existing customers

In order to logically capture this address change, I need to compare the current SCD2 and the source data (as I did manually above) and flag changes. I also need to be mindful of our row metadata fields to ensure that I am expiring and starting records using the appropriate dates.

Luckily, I can do this in one block of SQL and write the results out to a file in S3:

  • Join the customer_data dataset to the current SCD2 dataset on customer_number and the current record
  • Check for differences in the WHERE clause
  • Select all the attributes from the source dataset
  • Select the customer_dim_key from the current SCD2 dataset (for use in step 6)
  • Set eff_start_date to today
  • Set eff_end_date to the future
  • Set is_current to 1
# ############## create new current recs dataaset ############## #hd_new_curr_recs = """
SELECT t.customer_dim_key,
s.customer_number,
s.first_name,
s.last_name,
s.middle_initial,
s.address,
s.city,
s.state,
s.zip_code,
DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'PDT'))
AS eff_start_date,
DATE('9999-12-31') AS eff_end_date,
BOOLEAN(1) AS is_current
FROM customer_data s
INNER JOIN current_scd2 t
ON t.customer_number = s.customer_number
AND t.is_current = True
WHERE NVL(s.first_name, '') <> NVL(t.first_name, '')
OR NVL(s.last_name, '') <> NVL(t.last_name, '')
OR NVL(s.middle_initial, '') <> NVL(t.middle_initial, '')
OR NVL(s.address, '') <> NVL(t.address, '')
OR NVL(s.city, '') <> NVL(t.city, '')
OR NVL(s.state, '') <> NVL(t.state, '')
OR NVL(s.zip_code, '') <> NVL(t.zip_code, '')
"""
df_new_curr_recs = spark.sql(hd_new_curr_recs)df_new_curr_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_curr_recs/")df_new_curr_recs.createOrReplaceTempView("new_curr_recs")# ############## review dataset ############## #df_new_curr_recs = spark.read.parquet(v_s3_path + "/new_curr_recs/*").orderBy("customer_number")df_new_curr_recs.show(10, False)

The above logic runs through all the records and finds the one change I manually observed. As you can see below, a new record was created for John Smith with his new address, and the row metadata shows this record takes effect today and expires in the future.

New Record for John Smith

Step 6: Find previous current records to expire

Now that I have the a new current record for a customer that already exists, I need to expire the previous current record. I did include the customer_dim_key of the previous current record in our prior dataset, so I isolate that for future use.

# ########### isolate keys of records to be modified ########### #df_modfied_keys = df_new_curr_recs.select("customer_dim_key")df_modfied_keys.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/modfied_keys/")df_modfied_keys.createOrReplaceTempView("modfied_keys")
Records to be Modified

Step 7: Expire previous current records

Now I can go about expiring that prior record, while again being mindful of our row metadata fields and modifying them correctly. Recall that I cannot update the record, so I have to create a new instance of it.

In this block of code, I will:

  • Join the the current SCD2 dataset to the modified_keys dataset on customer_dim_key
  • Ensure that I am expiring the current record by double-checking if is_current = 1
  • Select all the attributes and eff_start_date from the current SCD2 dataset
  • Set eff_end_date to yesterday
  • Set is_current to 0
# ############## create new hist recs dataaset ############## #hd_new_hist_recs = """
SELECT t.customer_dim_key,
t.customer_number,
t.first_name,
t.last_name,
t.middle_initial,
t.address,
t.city,
t.state,
t.zip_code,
t.eff_start_date,
DATE_SUB(
DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'PDT')), 1
) AS eff_end_date,
BOOLEAN(0) AS is_current
FROM current_scd2 t
INNER JOIN modfied_keys k
ON k.customer_dim_key = t.customer_dim_key
WHERE t.is_current = True
"""
df_new_hist_recs = spark.sql(hd_new_hist_recs)df_new_hist_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_hist_recs/")df_new_hist_recs.createOrReplaceTempView("new_hist_recs")# ############## review dataset ############## #df_new_hist_recs = spark.read.parquet(v_s3_path + "/new_hist_recs/*").orderBy("customer_number")df_new_hist_recs.show(10, False)

The above logic expires the record properly and writes to its own dataset:

Expired Record for John Smith

Step 8: Isolate unaffected records

The records for William Chase and Susan Harris had no changes and they need to remain in the target dataset, so the next step is to place those in their own dataset.

This logic will:

  • Left join the modified_keys dataset to the current SC2 dataset on customer_dim_key
  • Look at records that are not in the modified_keys dataset
  • Keep all the attributes and row metadata the same
# ############## create unaffected recs dataset ############## #hd_unaffected_recs = """
SELECT s.customer_dim_key,
s.customer_number,
s.first_name,
s.last_name,
s.middle_initial,
s.address,
s.city,
s.state,
s.zip_code,
s.eff_start_date,
s.eff_end_date,
s.is_current
FROM current_scd2 s
LEFT OUTER JOIN modfied_keys k
ON k.customer_dim_key = s.customer_dim_key
WHERE k.customer_dim_key IS NULL
"""
df_unaffected_recs = spark.sql(hd_unaffected_recs)df_unaffected_recs.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/unaffected_recs/")df_unaffected_recs.createOrReplaceTempView("unaffected_recs")# ############## review dataset ############## #df_unaffected_recs = spark.read.parquet(v_s3_path + "/unaffected_recs/*").orderBy("customer_number")df_unaffected_recs.show(10, False)

The unaffected records are indeed isolated:

Unaffected Recs

Step 9: Create records for new customers

Lisa Cohen is a new customer and therefore does not yet exist in our SCD2.

The following logic:

  • Left joins the current SCD2 dataset to the customer_data dataset on customer_number
  • Look at records that are not in the current SCD2 dataset
  • Uses all the attributes from the source
  • Sets eff_start_date to today
  • Sets eff_end_date to the future
  • Sets is_current to 1
# ############## create new recs dataset ############## #hd_new_cust = """
SELECT s.customer_number,
s.first_name,
s.last_name,
s.middle_initial,
s.address,
s.city,
s.state,
s.zip_code,
DATE(FROM_UTC_TIMESTAMP(CURRENT_TIMESTAMP, 'PDT'))
AS eff_start_date,
DATE('9999-12-31') AS eff_end_date,
BOOLEAN(1) AS is_current
FROM customer_data s
LEFT OUTER JOIN current_scd2 t
ON t.customer_number = s.customer_number
WHERE t.customer_number IS NULL
"""
df_new_cust = spark.sql(hd_new_cust)df_new_cust.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_cust/")df_new_cust.createOrReplaceTempView("new_cust")# ############## review dataset ############## #df_new_cust = spark.read.parquet(v_s3_path + "/new_cust/*").orderBy("customer_number")df_new_cust.show(10, False)

Here is the result; the new customer is the expected format:

New Customer Record

Step 10: Combine the datasets for new SCD2

I’ve created the four datasets needed to create the new iteration of our SCD2:

  • Dataset of new current records for existing customers (“new_curr_recs”)
  • Dataset of expiring previous current records for existing customers (“new_hist_recs”)
  • Dataset of records which aren’t being modified (“unaffected_recs”)
  • Dataset of new, previously unseen, customers (“new_cust”)

All that remains is to merge them together for the final product. In this last bit of code, I will:

  • Find the max customer_dim_key value
  • Union the two datasets which do not have a customer_dim_key: new_cust and new_curr_recs
  • To create a new customer_dim_key, use the ROW_NUMBER() function and add the max customer_dim_key value (to preserve consecutiveness and uniqueness)
  • Union the prior unioned dataset to unaffected_recs and new_hist_recs
# ############## create new scd2 dataset ############## #v_max_key = spark.sql(
"SELECT STRING(MAX(customer_dim_key)) FROM current_scd2"
).collect()[0][0]
hd_new_scd2 = """
WITH a_cte
AS (
SELECT x.first_name, x.last_name,
x.middle_initial, x.address,
x.city, x.state, x.zip_code,
x.customer_number, x.eff_start_date,
x.eff_end_date, x.is_current
FROM new_cust x
UNION ALL
SELECT y.first_name, y.last_name,
y.middle_initial, y.address,
y.city, y.state, y.zip_code,
y.customer_number, y.eff_start_date,
y.eff_end_date, y.is_current
FROM new_curr_recs y
)
, b_cte
AS (
SELECT ROW_NUMBER() OVER(ORDER BY a.eff_start_date)
+ BIGINT('{v_max_key}') AS customer_dim_key,
a.first_name, a.last_name,
a.middle_initial, a.address,
a.city, a.state, a.zip_code,
a.customer_number, a.eff_start_date,
a.eff_end_date, a.is_current
FROM a_cte a
)
SELECT customer_dim_key, first_name, last_name,
middle_initial, address,
city, state, zip_code,
customer_number, eff_start_date,
eff_end_date, is_current
FROM b_cte
UNION ALL
SELECT customer_dim_key, first_name, last_name,
middle_initial, address,
city, state, zip_code,
customer_number, eff_start_date,
eff_end_date, is_current
FROM unaffected_recs
UNION ALL
SELECT customer_dim_key, first_name, last_name,
middle_initial, address,
city, state, zip_code,
customer_number, eff_start_date,
eff_end_date, is_current
FROM new_hist_recs
"""
df_new_scd2 = spark.sql(hd_new_scd2.replace("{v_max_key}", v_max_key))# ############## review dataset ############## #df_new_scd2.coalesce(1).write.mode("overwrite").parquet(v_s3_path + "/new_scd2/")df_new_scd2 = spark.read.parquet(v_s3_path + "/new_scd2/*").orderBy("customer_dim_key")df_new_scd2.show(10, False)
New SCD2

Everything looks as expected in the new version of the SCD2:

  • Records for William Chase and Susan Harris have not changed
  • John Smith has a record with an old address expired on 4/14/2019
  • John Smith has a new record with a new address effective 4/15/2019
  • Lisa Cohen has a new record effective 4/15/2019
  • Records which had existed in the prior iteration of the dataset retain their customer_dim_keys so the fact tables in the star schema do not need to be remapped

The new SCD2 is stored in S3 and can be used as you wish.

Some notes:

  1. The performance is excellent. In my production environment, the source table has 382 columns and ~7 million records and the SCD2 has 81 columns with ~110 million records. It takes ~10 minutes, on average, to process the data. In a standard RDBMS, it completes in ~180 minutes. This is a 94% improvement in processing time.
  2. The S3 paths I used in this discussion do not truly apply in a real-life business scenario, but work perfectly fine for this teaching scenario.

It was difficult for me to find anything online regarding the processing of SCD2s and star schemata using Spark. Frankly, it was quite frustrating, and I was very happy to figure it out myself. Evenmoreso, I was pleasantly surprised to see that it did not take a degree in rocket science to attain this. My hope is that any frustration you may have is brief and that this discussion helps you in your big data, star schema scripting endeavors.

--

--

Ryan studied English Lit in college & wrote a 63 page thesis on differences b/w there, their, & they’re (not really). He is a Principal Data Engineer @ Intuit.