The world’s leading publication for data science, AI, and ML professionals.

Data Lake Change Data Capture (CDC) using Apache Hudi on Amazon EMR – Part 2-Process

Easily process data changes over time from your database to Data Lake using Apache Hudi on Amazon EMR

Image by Gino Crescoli from Pixabay
Image by Gino Crescoli from Pixabay

In a previous article below we had discussed how to seamlessly collect CDC data using Amazon Database Migration Service (DMS).

https://towardsdatascience.com/data-lake-change-data-capture-cdc-using-amazon-database-migration-service-part-1-capture-b43c3422aad4

The following article will demonstrate how to process CDC data such that a near real-time representation of the your database is achieved in your data lake. We will use the combined power of of Apache Hudi and Amazon EMR to perform this operation. Apache Hudi is an open-source data management framework used to simplify incremental data processing in near real time.

We will kick-start the process by creating a new EMR Cluster

$ AWS emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Spark Name=Hive --ebs-root-volume-size 10 --ec2-attributes '{"KeyName":"roopikadf","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-097e5d6e","EmrManagedSlaveSecurityGroup":"sg-088d03d676ac73013","EmrManagedMasterSecurityGroup":"sg-062368f478fb07c11"}' --service-role EMR_DefaultRole --release-label emr-6.0.0 --name 'Training' --instance-groups '[{"InstanceCount":3,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 --bootstrap-actions Path=s3://aws-analytics-course/job/energy/emr.sh,Name=InstallPythonLibs

After creating the EMR cluster logon to the Master Node using SSH and issue the following commands. These commands will copy the Apache Hudi JAR files to S3.

$ aws s3 cp /usr/lib/hudi/hudi-spark-bundle.jar s3://aws-analytics-course/hudi/jar/   upload: ../../usr/lib/hudi/hudi-spark-bundle.jar to s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar
$ aws s3 cp /usr/lib/spark/external/lib/spark-avro.jar s3://aws-analytics-course/hudi/jar/
upload: ../../usr/lib/spark/external/lib/spark-avro.jar to s3://aws-analytics-course/hudi/jar/spark-avro.jar
$ aws s3 ls s3://aws-analytics-course/hudi/jar/
2020-10-21 17:00:41   23214176 hudi-spark-bundle.jar
2020-10-21 17:00:56     101212 spark-avro.jar

Now create a new EMR notebook and upload the notebook available at the following location. Upload hudi/hudi.ipynb

$ git clone https://github.com/mkukreja1/blogs.git

Create a Spark session using the Hudi JAR files uploaded to S3 in the previous step.

from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, DecimalType
from pyspark.sql.functions import *
from pyspark.sql.functions import concat, lit, col
spark = pyspark.sql.SparkSession.builder.appName("Product_Price_Tracking") 
     .config("spark.jars", "s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar,s3://aws-analytics-course/hudi/jar/spark-avro.jar") 
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .config("spark.sql.hive.convertMetastoreParquet", "false") 
     .getOrCreate()

Lets read the CDC files. We will start by reading the full load file.

TABLE_NAME = "coal_prod"
S3_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/LOAD00000001.csv"
S3_HUDI_DATA = "s3://aws-analytics-course/hudi/data/coal_prod"
coal_prod_schema = StructType([StructField("Mode", StringType()),
                               StructField("Entity", StringType()),
                               StructField("Code", StringType()),
                               StructField("Year", IntegerType()),
                               StructField("Production", DecimalType(10,2)),
                               StructField("Consumption", DecimalType(10,2))
                               ])
df_coal_prod = spark.read.csv(S3_RAW_DATA, header=False, schema=coal_prod_schema)
df_coal_prod.show(5)
+----+-----------+----+----+----------+-----------+
|Mode|     Entity|Code|Year|Production|Consumption|
+----+-----------+----+----+----------+-----------+
|   I|Afghanistan| AFG|1949|      0.04|       0.00|
|   I|Afghanistan| AFG|1950|      0.11|       0.00|
|   I|Afghanistan| AFG|1951|      0.12|       0.00|
|   I|Afghanistan| AFG|1952|      0.14|       0.00|
|   I|Afghanistan| AFG|1953|      0.13|       0.00|
+----+-----------+----+----+----------+-----------+
only showing top 5 rows

Apache Hudi requires a primary key to singularly identify each record. Typically, a sequentially generated primary key is best for this purpose. However our table does not have one. Tosolve this issue let us generate a PK by using a composite of Entity and Year columns. The key column below will be used as the primary key.

df_coal_prod=df_coal_prod.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_f=df_coal_prod.drop(df_coal_prod.Mode)
df_coal_prod_f.show(5)
+-----------+----+----+----------+-----------+---------------+
|     Entity|Code|Year|Production|Consumption|            key|
+-----------+----+----+----------+-----------+---------------+
|Afghanistan| AFG|1949|      0.04|       0.00|Afghanistan1949|
|Afghanistan| AFG|1950|      0.11|       0.00|Afghanistan1950|
|Afghanistan| AFG|1951|      0.12|       0.00|Afghanistan1951|
|Afghanistan| AFG|1952|      0.14|       0.00|Afghanistan1952|
|Afghanistan| AFG|1953|      0.13|       0.00|Afghanistan1953|
+-----------+----+----+----------+-----------+---------------+
only showing top 5 rows

We are now ready to save the data in the Hudi format. Since this is the very first time we are saving this table we will use the "bulk_insert" operation and mode=overwrite. Also notice that we are using the "key" column as the recordkey.

df_coal_prod_f.write.format("org.apache.hudi") 
            .option("hoodie.table.name", TABLE_NAME) 
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") 
            .option("hoodie.datasource.write.operation", "bulk_insert") 
            .option("hoodie.datasource.write.recordkey.field","key") 
            .option("hoodie.datasource.write.precombine.field", "key") 
            .mode("overwrite") 
            .save(S3_HUDI_DATA)

We can now read the newly create Hudi table.

df_final = spark.read.format("org.apache.hudi")
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)
+--------+
|count(1)|
+--------+
|    6282|
+--------+

+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|Entity|Code|Year|Production|Consumption|      key|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|     20201021215857|20201021215857_54...|         India2013|               default|8fae00ae-34e7-45e...| India| IND|2013|   2841.01|       0.00|India2013|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+

Notice that we have 6282 rows from the full load and data for key India2013. This key will updated in the next operation so it is important to note the history. We will now read the incremental data.

The incremental data came with 4 rows – 2 rows were Inserted, one row was Updated, and one row is Deleted. We will handle the Inserted and Updated rows first. notice the filter for ("Mode IN (‘U’, ‘I’)") below.

S3_INCR_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/20200808-*.csv"
df_coal_prod_incr = spark.read.csv(S3_INCR_RAW_DATA, header=False, schema=coal_prod_schema)
df_coal_prod_incr_u_i=df_coal_prod_incr.filter("Mode IN ('U', 'I')")
df_coal_prod_incr_u_i=df_coal_prod_incr_u_i.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_u_i.show(5)
df_coal_prod_incr_u_i_f=df_coal_prod_incr_u_i.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_u_i_f.show()
+----+------+----+----+----------+-----------+---------+
|Mode|Entity|Code|Year|Production|Consumption|      key|
+----+------+----+----+----------+-----------+---------+
|   I| India| IND|2015|   4056.33|       0.00|India2015|
|   I| India| IND|2016|   4890.45|       0.00|India2016|
|   U| India| IND|2013|   2845.66|     145.66|India2013|
+----+------+----+----+----------+-----------+---------+

+------+----+----+----------+-----------+---------+
|Entity|Code|Year|Production|Consumption|      key|
+------+----+----+----------+-----------+---------+
| India| IND|2015|   4056.33|       0.00|India2015|
| India| IND|2016|   4890.45|       0.00|India2016|
| India| IND|2013|   2845.66|     145.66|India2013|
+------+----+----+----------+-----------+---------+

We are now ready to perform a Hudi Upsert operation for the incremental data. Since this table already exists this time we will use the append option.

df_coal_prod_incr_u_i_f.write.format("org.apache.hudi") 
            .option("hoodie.table.name", TABLE_NAME) 
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") 
            .option("hoodie.datasource.write.operation", "upsert") 
            .option("hoodie.upsert.shuffle.parallelism", 20) 
            .option("hoodie.datasource.write.recordkey.field","key") 
            .option("hoodie.datasource.write.precombine.field", "key") 
            .mode("append") 
            .save(S3_HUDI_DATA)

Check the underlying data. Notice that the 2 new rows have been added so the table count has gone up from 6282 to 6284. Also note the row for key India2013 now has been updated for Production & Consumption columns.

df_final = spark.read.format("org.apache.hudi")
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)
+--------+
|count(1)|
+--------+
|    6284|
+--------+

+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|Entity|Code|Year|Production|Consumption|      key|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
|     20201021220359|20201021220359_0_...|         India2013|               default|8fae00ae-34e7-45e...| India| IND|2013|   2845.66|     145.66|India2013|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+

Now we would like to deal with the one Deleted row.

df_coal_prod_incr_d=df_coal_prod_incr.filter("Mode IN ('D')")
df_coal_prod_incr_d=df_coal_prod_incr_d.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_d_f=df_coal_prod_incr_d.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_d_f.show()
+------+----+----+----------+-----------+---------+
|Entity|Code|Year|Production|Consumption|      key|
+------+----+----+----------+-----------+---------+
| India| IND|2010|   2710.54|       0.00|India2010|
+------+----+----+----------+-----------+---------+

We can do this with a Hudi Upsert operation but need to use and extra option for deletes hoodie.datasource.write.payload.class=org.apache.hudi.EmptyHoodieRecordPayload

df_coal_prod_incr_d_f.write.format("org.apache.hudi") 
            .option("hoodie.table.name", TABLE_NAME) 
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") 
            .option("hoodie.datasource.write.operation", "upsert") 
            .option("hoodie.upsert.shuffle.parallelism", 20) 
            .option("hoodie.datasource.write.recordkey.field","key") 
            .option("hoodie.datasource.write.precombine.field", "key") 
            .option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload") 
            .mode("append") 
            .save(S3_HUDI_DATA)

We can now check the results. Since one row has been deleted the count has gone down from up from 6284 to 6283. Also, the query for the deleted row came back empty. Everything worked as desired.

df_final = spark.read.format("org.apache.hudi")
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2010'").show(5)
+--------+
|count(1)|
+--------+
|    6283|
+--------+

+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|Entity|Code|Year|Production|Consumption|key|
+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+
+-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+

All the code used in this article can be found on the link below:

mkukreja1/blogs

I hope this article was helpful. CDC using Amazon Database Migration Service is covered as part of the AWS Big Data Analytics course offered by Datafence Cloud Academy. The course is taught online by myself on weekends.


Related Articles