Implementation of cdc in spark using delta file

Rajesh
3 min readMay 24, 2021

Many time when I work with kafka I feel tempted to use kafka to store the data but it should never be used as datastore instead we should use our data lake to store the topics. I was working on a poc to create a stream which consumes the events coming from kafka and store it in our S3 bucket (Data lakes) .

Here I am running kafka service on ec2 instance and I have written a producer in scala which takes care of sending messages according to our need . My message has three part txn_id,amount,staus. My streaming application consume these topic from kafka and write to delta table by keeping track of offset in checkpoint location .In case the txn_id is same as the previous data we should upsert the exiting the data instead of inserting a new row .We can not achieve this using our traditional data lake .For this we should use something like delta table given by data bricks community .Delta table which is an open source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling.It runs on top of our data lake can be consumed by further downstream application .The main beauty of delta table is it provided us the functionality to update the or delete the records if already any reference to the same value exits and we can see the history of changes if we run time travel query .This is very much beneficial for CDC (change data capture) in real world bigdata problems as our data lake be it s3 or hdfs does not provide updating the data by default .

I have included delta-core ,spark-sql-kafka in my project

Below is my sbt dependency which I had added in my project

I have created a topic named kafak-topic with replication and partition 1 in kafka for this demo.Below is my producer class which takes care of sending message for the above topic.

Now coming to the spark structured streaming consumer where we consume it and write it to delta table .We configure a checkpoint location here using .option(“checkpointLocation”, checkPointdir) and simultaneously we provided .option(“startingOffsets”, “earliest”) So if checkpoint is empty it will read from earliest offset as we don’t want to miss message and if our streaming application is ended abruptly then instead of start reading from beginning it will read from last processed offset .

Before going ahead and writing data to delta table we should make sure the initial delta table with the schema of our data exits .We should infer the schema from topic and same can be stored in s3 for future reference .For this demo I skipped that part and just created a initial schema with empty table and I passed .mode(SaveMode.Ignore) This mode will make sure if the initial table exits it will not create again .Below is how my consumer looks like .

In above code snippets I have included the logic to update my stored data if the message comes with same txn id inside the foreachbatch of spark .Please click here to find the source of the project

--

--

Rajesh

An Engineer By profession . Like to explore new technology