Implementing Slowly Changing Dimension Type 2 Using Delta Lake on Databricks

Built on Apache Spark, Delta Lake provides a robust storage layer for data in Delta tables. Its features include ACID transactions, high-performance queries, schema evolution, and data versioning, among others.

Today’s focus is on how Delta Lake simplifies the management of slowly changing dimensions (SCDs).

Quickly review Type 2 of Slowly Changing Dimension 

A quick recap of SCD Type 2 follows:

  • Storing historical dimension data with effective dates.
  • Keeping a full history of dimension changes (with start/end dates).
  • Adding new rows for dimension changes (preserving history).
# Existing Dimension data
surrokey  depID   dep	StartDate   EndDate     IsActivity
1	  1001	  IT	2019-01-01  9999-12-31  1
2	  1002	  Sales	2019-01-01  9999-12-31  1
3	  1003	  HR	2019-01-01  9999-12-31  1

# Dimension changed and new data comes 
depId dep
1003  wholesale   <--- depID is same, name changed from "Sales" to "wholesale"
1004  Finance     <--- new data

# the new Dimension will be:
surrokey  depID	dep	   StartDate   EndDate     IsActivity 
1	  1001	IT	   2019-01-01  9999-12-31  1   <-- No action required
2	  1002	HR	   2019-01-01  9999-12-31  1   <-- No action required
3	  1003	Sales	   2019-01-01  2020-12-31  0   <-- mark as inactive
4         1003  wholesale  2021-01-01  9999-12-31  1   <-- add updated active value
5         1004  Finance    2021-01-01  9999-12-31  1   <-- insert new data

Creating demo data

We’re creating a Delta table, dim_dep, and inserting three rows of existing dimension data.

Existing dimension data

%sql
# Create table dim_dep
%sql
create table dim_dep (
Surrokey BIGINT  GENERATED ALWAYS AS IDENTITY
, depID  int
, dep	string
, StartDate   DATE 
, End_date DATE 
, IsActivity BOOLEAN
)
using delta
location 'dbfs:/mnt/dim/'

# Insert data
insert into dim_dep (depID,dep, StartDate,EndDate,IsActivity) values
(1001,'IT','2019-01-01', '9999-12-31' , 1),
(1002,'Sales','2019-01-01', '9999-12-31' , 1),
(1003,'HR','2019-01-01', '9999-12-31' , 1)

select * from dim_dep
Surrokey depID	dep	StartDate	EndDate	        IsActivity
1	 1001	IT	2019-01-01	9999-12-31	true
2	 1002	Sales	2019-01-01	9999-12-31	true
3	 1003	HR	2019-01-01	9999-12-31	true
%python
dbutils.fs.ls('dbfs:/mnt/dim')
path	name	size	modificationTime
Out[43]: [FileInfo(path='dbfs:/mnt/dim/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/dim/part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', name='part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', size=1858, modificationTime=1736027755000)]

New coming source data

The new coming source data which may contain new record or updated record.

Dimension changed and new data comes 
depId       dep
1002        wholesale 
1003        HR  
1004        Finance     

  • depID 1002, dep changed from “Sales” to “wholesale”, updating dim_dep table;
  • depID 1003, nothing changed, no action required
  • depID 1004, is a new record, inserting into dim_dep

Assuming the data, originating from other business processes, is now stored in the data lake as CSV files.

Implementing SCD Type 2

Step 1: Read the source

%python 
df_dim_dep_source = spark.read.csv('dbfs:/FileStore/dep.csv', header=True)

df_dim_dep_source.show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Step 2: Read the target

df_dim_dep_target = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_dim_dep_target.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Step 3: Source Left outer Join Target

We perform a source dataframe – df_dim_dep_source, left outer join target dataframe – df_dim_dep_target, where source depID = target depID, and also target’s IsActivity = 1 (meant activity)

This join’s intent is not to miss any new data coming through source. And active records in target because only for those data SCD update is required. After joining source and target, the resultant dataframe can be seen below.

src = df_dim_dep_source
tar = df_dim_dep_target
df_joined = src.join (tar,\
        (src.depid == tar.depID) \
         & (tar.IsActivity == 'true')\
        ,'left') \
    .select(src['*'] \
        , tar.Surrokey.alias('tar_surrokey')\
        , tar.depID.alias('tar_depID')\
        , tar.dep.alias('tar_dep')\
        , tar.StartDate.alias('tar_StartDate')\
        , tar.EndDate.alias('tar_EndDate')\
        , tar.IsActivity.alias('tar_IsActivity')   )
    
df_joined.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1003|       HR|           3|     1003|     HR|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

Step 4: Filter only the non matched and updated records

In this demo, we only have depid and dep two columns. But in the actual development environment, may have many many columns.

Instead of comparing multiple columns, e.g.,
src_col1 != tar_col1,
src_col2 != tar_col2,
…..
src_colN != tar_colN
We compute hashes for both column combinations and compare the hashes. In addition of this, in case of column’s data type is different, we convert data type the same one.

from pyspark.sql.functions import col , xxhash64

df_filtered = df_joined.filter(\
    xxhash64(col('depid').cast('string'),col('dep').cast('string')) \
    != \
    xxhash64(col('tar_depID').cast('string'),col('tar_dep').cast('string'))\
    )
    
df_filtered.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

from the result, we can see:

  • The row, dep_id = 1003, dep = HR, was filtered out because both source and target side are the same. No action required.
  • The row, depid =1002, dep changed from “Sales” to “Wholesale”, need updating.
  • The row, depid = 1004, Finance is brand new row, need insert into target side – dimension table.

Step 5: Find out records that will be used for inserting

From above discussion, we have known depid=1002, need updating and depid=1004 is a new rocord. We will create a new column ‘merge_key’ which will be used for upsert operation. This column will hold the values of source id.

Add a new column – “merge_key”

df_inserting = df_filtered. withColumn('merge_key', col('depid'))

df_inserting.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
The above 2 records will be inserted as new records to the target table

The above 2 records will be inserted as new records to the target table.

Step 6: Find out the records that will be used for updating in target table

from pyspark.sql.functions import lit
df_updating = df_filtered.filter(col('tar_depID').isNotNull()).withColumn('merge_key',lit('None')

df_updating.show()
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
| 1003|Wholesale|           3|     1003|   2019-01-01| 9999-12-31|          true|     None|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
The above record will be used for updating SCD columns in the target table.

This dataframe filters the records that have tar_depID column not null which means, the record already exists in the table for which SCD update has to be done. The column merge_key will be ‘None’ here which denotes this only requires update in SCD cols.

Step 7: Combine inserting and updating records as stage

df_stage_final = df_updating.union(df_instering)

df_stage_final.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     None| <-- updating in SCD table
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002| <-- inserting in SCD table
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004| <-- inserting in SCD table
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
  • records with merge_key as none are for updating in existing dimension table.
  • records with merge_key not null will be inserted as new records in dimension table.

Step 8: Upserting the dim_dep Dimension Table

Before performing the upsert, let’s quickly review the existing dim_dep table and the incoming source data.

# Existing dim_dep table
spark.read.table('dim_dep').show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

# coming updated source data
park.read.csv('dbfs:/FileStore/dep_src.csv', header=True).show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Implementing an SCD Type 2 UpSert on the dim_dep Dimension Table

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, to_date, lit

# define the source DataFrame
src = df_stage_final  # this is a DataFrame object

# Load the target Delta table
tar = DeltaTable.forPath(spark, "dbfs:/mnt/dim")  # target Dimension table


# Performing the UpSert
tar.alias("tar").merge(
    src.alias("src"),
    condition="tar.depID == src.merge_key and tar_IsActivity = 'true'"
).whenMatchedUpdate( \
    set = { \
        "IsActivity": "'false'", \
        "EndDate": "current_date()" \
        }) \
.whenNotMatchedInsert( \
    values = \
    {"depID": "src.depid", \
    "dep": "src.dep", \
    "StartDate": "current_date ()", \
    "EndDate": """to_date('9999-12-31', 'yyyy-MM-dd')""", \
    "IsActivity": "'true' \
    "}) \
.execute()

all done!

Validating the result

spark.read.table('dim_dep').sort(['depID','Surrokey']).show()
+--------+-----+---------+----------+----------+----------+
|Surrokey|depID|      dep| StartDate|   EndDate|IsActivity|
+--------+-----+---------+----------+----------+----------+
|       1| 1001|       IT|2019-01-01|9999-12-31|      true|
|       2| 1002|    Sales|2019-01-01|2020-01-05|     false| <--inactived
|       4| 1002|Wholesale|2020-01-05|9999-12-31|      true| <--updated status
|       3| 1003|       HR|2019-01-01|9999-12-31|      true|
|       5| 1004|  Finance|2020-01-05|9999-12-31|      true| <--append new record
+--------+-----+---------+----------+----------+----------+

Conclusion

we demonstrated how to unlock the power of Slowly Changing Dimension (SCD) Type 2 using Delta Lake, a revolutionary storage layer that transforms data lakes into reliable, high-performance, and scalable repositories.  With this approach, organizations can finally unlock the full potential of their data and make informed decisions with confidence

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)