Implement SCD Type 2 via Spark Data Frames

While working with any data pipeline projects most of times programmer deals with slowly changing dimension data .

Here in this post I will try to jot down implementation of SCD Type 2 using apache spark.All these operations will be done in memory after reading your source and target data. The code is implemented using Scala by you can implement similar logic in python too.

Over View of Data:

Source Data
TargetData Exiting data
Final Output

Create Taget Data:

val targetList=List(Row(1, "Hello!", false, false,"2020-01-01", "2021-1-31"),
Row(1, "Hadoop", true, false, "2019-01-01", "9999-12-31"),
Row(2, "Hadoop with Java", true, false,
"2019-02-01", "9999-12-31"),
Row(3, "old system", true, false,
"2019-02-01", "9999-12-31"))
val schema_target = StructType(List(
StructField("id", IntegerType, true),
StructField("attr", StringType, true),
StructField("is_current", BooleanType, true),
StructField("is_deleted", BooleanType, true),
StructField("start_date", StringType, true),
StructField("end_date", StringType, true)))
val targetDf=sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(targetList),schema_target)

In reality we normally read those data from Hive ,sql or nosql system .

Source Data:

//Sample Source Data
val sourcelist = List(
Row(1, "Spark"),
Row(2, "PySpark!"),
Row(4, "Scala!")
)
val schema_source = StructType(List(
StructField("src_id", IntegerType, true),
StructField("src_attr", StringType, true)
))

Now, we can do a full join with these two data frames.

As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.

There are four possibilities for the actions:

  • UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
  • DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
  • INSERT: new business keys exist in source that need to be inserted into the target table directly.
  • NOACTION: no changes to the attributes or the records in target table are not current.
val joinDataDf=targetDf.join(newsourcedf,targetDf("id")===newsourcedf("src_id")
and targetDf("end_date")===newsourcedf("src_end_date") ,"full")
val enrichedActionDf=joinDataDf.withColumn("action",when(joinDataDf("attr")=!=joinDataDf("src_attr"),"Upsert")
.when(joinDataDf("src_attr").isNull and joinDataDf("is_current"),"Delete")
.when(joinDataDf("id").isNull,"Insert").otherwise("NoAction"))

Now as the action column is populated lets work on implementing the action Below snippets takes care of the same

val column_names = Seq("id", "attr", "is_current",
"is_deleted", "start_date", "end_date")
val df_NoAction= enrichedActionDf.filter($"action"==="NoAction").select(column_names.map(c=>col(c)):_*)
val df_insert= enrichedActionDf.filter($"action"==="Insert")
.select(enrichedActionDf("src_id") as "id", enrichedActionDf("src_attr") as "attr" ,lit(true) as "is_current",lit(false) as "is_deleted",
enrichedActionDf("src_start_date") as "start_date",enrichedActionDf("src_end_date") as "end_date")
val df_delete= enrichedActionDf.filter($"action"==="Delete").withColumn("end_date",lit(todayDate)).withColumn("is_deleted",lit(true)).withColumn("is_current",lit(false))
.select(column_names.map(c=>col(c)):_*)
val df_upsert_1=enrichedActionDf.filter($"action"==="Upsert").withColumn("end_date",enrichedActionDf("src_start_date")).withColumn("is_current",lit(false))
.select(column_names.map(c=>col(c)):_*)
val df_upsert_2=enrichedActionDf.filter($"action"==="Upsert").withColumn("is_current",lit(true)).withColumn("is_deleted",lit(false))
.select($"id",$"src_attr" as "attr",lit(true) as "is_current",lit(false) as "is_deleted",$"src_start_date" as "start_date",$"src_end_date" as "end_date")

Now as all the scd type 2 actions are implemented let’s union and find the final data.

val final_merged_df=df_NoAction.union(df_insert).union(df_delete).union(df_upsert_1).union(df_upsert_2)

In this demo I tried to read all the data to memory which might not be efficient solution in real environment .Instead we can use partitioning of data.For example in this use case we could have partitioned based on end _date .With this approach we will be read only active partition to merge with source data .We can only overwrite a single partition in parquet too to save IO operations

Source of this demo is available in link

An Engineer By profession . Like to explore new technology

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store