Built on Apache Spark, Delta Lake provides a robust storage layer for data in Delta tables. Its features include ACID transactions, high-performance queries, schema evolution, and data versioning, among others.
Today’s focus is on how Delta Lake simplifies the management of slowly changing dimensions (SCDs).
Quickly review Type 2 of Slowly Changing Dimension
A quick recap of SCD Type 2 follows:
- Storing historical dimension data with effective dates.
- Keeping a full history of dimension changes (with start/end dates).
- Adding new rows for dimension changes (preserving history).
# Existing Dimension data
surrokey depID dep StartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 1
2 1002 Sales 2019-01-01 9999-12-31 1
3 1003 HR 2019-01-01 9999-12-31 1
# Dimension changed and new data comes
depId dep
1003 wholesale <--- depID is same, name changed from "Sales" to "wholesale"
1004 Finance <--- new data
# the new Dimension will be:
surrokey depID dep StartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 1 <-- No action required
2 1002 HR 2019-01-01 9999-12-31 1 <-- No action required
3 1003 Sales 2019-01-01 2020-12-31 0 <-- mark as inactive
4 1003 wholesale 2021-01-01 9999-12-31 1 <-- add updated active value
5 1004 Finance 2021-01-01 9999-12-31 1 <-- insert new data
Creating demo data
We’re creating a Delta table, dim_dep
, and inserting three rows of existing dimension data.
Existing dimension data
%sql
# Create table dim_dep
%sql
create table dim_dep (
Surrokey BIGINT GENERATED ALWAYS AS IDENTITY
, depID int
, dep string
, StartDate DATE
, End_date DATE
, IsActivity BOOLEAN
)
using delta
location 'dbfs:/mnt/dim/'
# Insert data
insert into dim_dep (depID,dep, StartDate,EndDate,IsActivity) values
(1001,'IT','2019-01-01', '9999-12-31' , 1),
(1002,'Sales','2019-01-01', '9999-12-31' , 1),
(1003,'HR','2019-01-01', '9999-12-31' , 1)
select * from dim_dep
Surrokey depID dep StartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 true
2 1002 Sales 2019-01-01 9999-12-31 true
3 1003 HR 2019-01-01 9999-12-31 true
%python
dbutils.fs.ls('dbfs:/mnt/dim')
path name size modificationTime
Out[43]: [FileInfo(path='dbfs:/mnt/dim/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
FileInfo(path='dbfs:/mnt/dim/part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', name='part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', size=1858, modificationTime=1736027755000)]

New coming source data
The new coming source data which may contain new record or updated record.
Dimension changed and new data comes
depId dep
1002 wholesale
1003 HR
1004 Finance
- depID 1002, dep changed from “Sales” to “wholesale”, updating dim_dep table;
- depID 1003, nothing changed, no action required
- depID 1004, is a new record, inserting into dim_dep
Assuming the data, originating from other business processes, is now stored in the data lake as CSV files.
Implementing SCD Type 2
Step 1: Read the source
%python
df_dim_dep_source = spark.read.csv('dbfs:/FileStore/dep.csv', header=True)
df_dim_dep_source.show()
+-----+---------+
|depid| dep|
+-----+---------+
| 1002|Wholesale|
| 1003| HR|
| 1004| Finance|
+-----+---------+
Step 2: Read the target
df_dim_dep_target = spark.read.format("delta").load("dbfs:/mnt/dim/")
df_dim_dep_target.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID| dep| StartDate| EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
| 1| 1001| IT|2019-01-01|9999-12-31| true|
| 2| 1002|Sales|2019-01-01|9999-12-31| true|
| 3| 1003| HR|2019-01-01|9999-12-31| true|
+--------+-----+-----+----------+----------+----------+
Step 3: Source Left outer Join Target
We perform a source dataframe – df_dim_dep_source, left outer join target dataframe – df_dim_dep_target, where source depID = target depID, and also target’s IsActivity = 1 (meant activity)
This join’s intent is not to miss any new data coming through source. And active records in target because only for those data SCD update is required. After joining source and target, the resultant dataframe can be seen below.
src = df_dim_dep_source
tar = df_dim_dep_target
df_joined = src.join (tar,\
(src.depid == tar.depID) \
& (tar.IsActivity == 'true')\
,'left') \
.select(src['*'] \
, tar.Surrokey.alias('tar_surrokey')\
, tar.depID.alias('tar_depID')\
, tar.dep.alias('tar_dep')\
, tar.StartDate.alias('tar_StartDate')\
, tar.EndDate.alias('tar_EndDate')\
, tar.IsActivity.alias('tar_IsActivity') )
df_joined.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid| dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true|
| 1003| HR| 3| 1003| HR| 2019-01-01| 9999-12-31| true|
| 1004| Finance| null| null| null| null| null| null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
Step 4: Filter only the non matched and updated records
In this demo, we only have depid and dep two columns. But in the actual development environment, may have many many columns.
Instead of comparing multiple columns, e.g.,
src_col1 != tar_col1,
src_col2 != tar_col2,
…..
src_colN != tar_colN
We compute hashes for both column combinations and compare the hashes. In addition of this, in case of column’s data type is different, we convert data type the same one.
from pyspark.sql.functions import col , xxhash64
df_filtered = df_joined.filter(\
xxhash64(col('depid').cast('string'),col('dep').cast('string')) \
!= \
xxhash64(col('tar_depID').cast('string'),col('tar_dep').cast('string'))\
)
df_filtered.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid| dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true|
| 1004| Finance| null| null| null| null| null| null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
from the result, we can see:
- The row, dep_id = 1003, dep = HR, was filtered out because both source and target side are the same. No action required.
- The row, depid =1002, dep changed from “Sales” to “Wholesale”, need updating.
- The row, depid = 1004, Finance is brand new row, need insert into target side – dimension table.
Step 5: Find out records that will be used for inserting
From above discussion, we have known depid=1002, need updating and depid=1004 is a new rocord. We will create a new column ‘merge_key’ which will be used for upsert operation. This column will hold the values of source id.
Add a new column – “merge_key”
df_inserting = df_filtered. withColumn('merge_key', col('depid'))
df_inserting.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid| dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true| 1002|
| 1004| Finance| null| null| null| null| null| null| 1004|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
The above 2 records will be inserted as new records to the target table
The above 2 records will be inserted as new records to the target table.
Step 6: Find out the records that will be used for updating in target table
from pyspark.sql.functions import lit
df_updating = df_filtered.filter(col('tar_depID').isNotNull()).withColumn('merge_key',lit('None')
df_updating.show()
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
|depid| dep|tar_surrokey|tar_depID|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
| 1003|Wholesale| 3| 1003| 2019-01-01| 9999-12-31| true| None|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
The above record will be used for updating SCD columns in the target table.
This dataframe filters the records that have tar_depID column not null which means, the record already exists in the table for which SCD update has to be done. The column merge_key will be ‘None’ here which denotes this only requires update in SCD cols.
Step 7: Combine inserting and updating records as stage
df_stage_final = df_updating.union(df_instering)
df_stage_final.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid| dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true| None| <-- updating in SCD table
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true| 1002| <-- inserting in SCD table
| 1004| Finance| null| null| null| null| null| null| 1004| <-- inserting in SCD table
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
- records with merge_key as none are for updating in existing dimension table.
- records with merge_key not null will be inserted as new records in dimension table.
Step 8: Upserting the dim_dep Dimension Table
Before performing the upsert, let’s quickly review the existing dim_dep
table and the incoming source data.
# Existing dim_dep table
spark.read.table('dim_dep').show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID| dep| StartDate| EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
| 1| 1001| IT|2019-01-01|9999-12-31| true|
| 2| 1002|Sales|2019-01-01|9999-12-31| true|
| 3| 1003| HR|2019-01-01|9999-12-31| true|
+--------+-----+-----+----------+----------+----------+
# coming updated source data
park.read.csv('dbfs:/FileStore/dep_src.csv', header=True).show()
+-----+---------+
|depid| dep|
+-----+---------+
| 1002|Wholesale|
| 1003| HR|
| 1004| Finance|
+-----+---------+
Implementing an SCD Type 2 UpSert on the dim_dep Dimension Table
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, to_date, lit
# define the source DataFrame
src = df_stage_final # this is a DataFrame object
# Load the target Delta table
tar = DeltaTable.forPath(spark, "dbfs:/mnt/dim") # target Dimension table
# Performing the UpSert
tar.alias("tar").merge(
src.alias("src"),
condition="tar.depID == src.merge_key and tar_IsActivity = 'true'"
).whenMatchedUpdate( \
set = { \
"IsActivity": "'false'", \
"EndDate": "current_date()" \
}) \
.whenNotMatchedInsert( \
values = \
{"depID": "src.depid", \
"dep": "src.dep", \
"StartDate": "current_date ()", \
"EndDate": """to_date('9999-12-31', 'yyyy-MM-dd')""", \
"IsActivity": "'true' \
"}) \
.execute()
all done!
Validating the result
spark.read.table('dim_dep').sort(['depID','Surrokey']).show()
+--------+-----+---------+----------+----------+----------+
|Surrokey|depID| dep| StartDate| EndDate|IsActivity|
+--------+-----+---------+----------+----------+----------+
| 1| 1001| IT|2019-01-01|9999-12-31| true|
| 2| 1002| Sales|2019-01-01|2020-01-05| false| <--inactived
| 4| 1002|Wholesale|2020-01-05|9999-12-31| true| <--updated status
| 3| 1003| HR|2019-01-01|9999-12-31| true|
| 5| 1004| Finance|2020-01-05|9999-12-31| true| <--append new record
+--------+-----+---------+----------+----------+----------+
Conclusion
we demonstrated how to unlock the power of Slowly Changing Dimension (SCD) Type 2 using Delta Lake, a revolutionary storage layer that transforms data lakes into reliable, high-performance, and scalable repositories. With this approach, organizations can finally unlock the full potential of their data and make informed decisions with confidence
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
(remove all space from the email account 😊)