Implement SCD Type 2 via Spark Data Frames

Source Data
TargetData Exiting data
Final Output
val targetList=List(Row(1, "Hello!", false, false,"2020-01-01", "2021-1-31"),
Row(1, "Hadoop", true, false, "2019-01-01", "9999-12-31"),
Row(2, "Hadoop with Java", true, false,
"2019-02-01", "9999-12-31"),
Row(3, "old system", true, false,
"2019-02-01", "9999-12-31"))
val schema_target = StructType(List(
StructField("id", IntegerType, true),
StructField("attr", StringType, true),
StructField("is_current", BooleanType, true),
StructField("is_deleted", BooleanType, true),
StructField("start_date", StringType, true),
StructField("end_date", StringType, true)))
val targetDf=sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(targetList),schema_target)
//Sample Source Data
val sourcelist = List(
Row(1, "Spark"),
Row(2, "PySpark!"),
Row(4, "Scala!")
)
val schema_source = StructType(List(
StructField("src_id", IntegerType, true),
StructField("src_attr", StringType, true)
))

Implement full join between source and target data frames

  • UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
  • DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
  • INSERT: new business keys exist in source that need to be inserted into the target table directly.
  • NOACTION: no changes to the attributes or the records in target table are not current.
val joinDataDf=targetDf.join(newsourcedf,targetDf("id")===newsourcedf("src_id")
and targetDf("end_date")===newsourcedf("src_end_date") ,"full")
val enrichedActionDf=joinDataDf.withColumn("action",when(joinDataDf("attr")=!=joinDataDf("src_attr"),"Upsert")
.when(joinDataDf("src_attr").isNull and joinDataDf("is_current"),"Delete")
.when(joinDataDf("id").isNull,"Insert").otherwise("NoAction"))
val column_names = Seq("id", "attr", "is_current",
"is_deleted", "start_date", "end_date")
val df_NoAction= enrichedActionDf.filter($"action"==="NoAction").select(column_names.map(c=>col(c)):_*)
val df_insert= enrichedActionDf.filter($"action"==="Insert")
.select(enrichedActionDf("src_id") as "id", enrichedActionDf("src_attr") as "attr" ,lit(true) as "is_current",lit(false) as "is_deleted",
enrichedActionDf("src_start_date") as "start_date",enrichedActionDf("src_end_date") as "end_date")
val df_delete= enrichedActionDf.filter($"action"==="Delete").withColumn("end_date",lit(todayDate)).withColumn("is_deleted",lit(true)).withColumn("is_current",lit(false))
.select(column_names.map(c=>col(c)):_*)
val df_upsert_1=enrichedActionDf.filter($"action"==="Upsert").withColumn("end_date",enrichedActionDf("src_start_date")).withColumn("is_current",lit(false))
.select(column_names.map(c=>col(c)):_*)
val df_upsert_2=enrichedActionDf.filter($"action"==="Upsert").withColumn("is_current",lit(true)).withColumn("is_deleted",lit(false))
.select($"id",$"src_attr" as "attr",lit(true) as "is_current",lit(false) as "is_deleted",$"src_start_date" as "start_date",$"src_end_date" as "end_date")
val final_merged_df=df_NoAction.union(df_insert).union(df_delete).union(df_upsert_1).union(df_upsert_2)

--

--

--

An Engineer By profession . Like to explore new technology

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

AuroraFS

The end of our hiatus

CS373 Spring 2021- Final Entry

How to Load Scenes and Implement a Restart Game Feature in Unity — Part 02

How I Created “DaddyHome” with OpenFaaS and Kubernetes — Part 2 “Adapting to Requirement Changes”

Practical Hashicorp Nomad and Consul — CI/CD Pipeline to deploy the API and WebApp Part 2

How to Read Data from Hive & Write to MS SQL Table Using Spark-Shell

HackTheBox -BrainFuck [Road To OSCP]

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Rajesh

Rajesh

An Engineer By profession . Like to explore new technology

More from Medium

Starting with Spark and Zeppelin in 2 Minutes Using Docker — Create Your First Data Frame

Learning Spark — Part 1 Spark Environment Installation

Getting Started With Apache Spark Using Databricks Community Edition

Setting up Spark in Jupyter lab