Implementing Slowly Changing Dimension Type 2 Using Delta Lake on Databricks

Built on Apache Spark, Delta Lake provides a robust storage layer for data in Delta tables. Its features include ACID transactions, high-performance queries, schema evolution, and data versioning, among others.

Today’s focus is on how Delta Lake simplifies the management of slowly changing dimensions (SCDs).

Quickly review Type 2 of Slowly Changing Dimension 

A quick recap of SCD Type 2 follows:

  • Storing historical dimension data with effective dates.
  • Keeping a full history of dimension changes (with start/end dates).
  • Adding new rows for dimension changes (preserving history).
# Existing Dimension data
surrokey  depID   dep	StartDate   EndDate     IsActivity
1	  1001	  IT	2019-01-01  9999-12-31  1
2	  1002	  Sales	2019-01-01  9999-12-31  1
3	  1003	  HR	2019-01-01  9999-12-31  1

# Dimension changed and new data comes 
depId dep
1003  wholesale   <--- depID is same, name changed from "Sales" to "wholesale"
1004  Finance     <--- new data

# the new Dimension will be:
surrokey  depID	dep	   StartDate   EndDate     IsActivity 
1	  1001	IT	   2019-01-01  9999-12-31  1   <-- No action required
2	  1002	HR	   2019-01-01  9999-12-31  1   <-- No action required
3	  1003	Sales	   2019-01-01  2020-12-31  0   <-- mark as inactive
4         1003  wholesale  2021-01-01  9999-12-31  1   <-- add updated active value
5         1004  Finance    2021-01-01  9999-12-31  1   <-- insert new data

Creating demo data

We’re creating a Delta table, dim_dep, and inserting three rows of existing dimension data.

Existing dimension data

%sql
# Create table dim_dep
%sql
create table dim_dep (
Surrokey BIGINT  GENERATED ALWAYS AS IDENTITY
, depID  int
, dep	string
, StartDate   DATE 
, End_date DATE 
, IsActivity BOOLEAN
)
using delta
location 'dbfs:/mnt/dim/'

# Insert data
insert into dim_dep (depID,dep, StartDate,EndDate,IsActivity) values
(1001,'IT','2019-01-01', '9999-12-31' , 1),
(1002,'Sales','2019-01-01', '9999-12-31' , 1),
(1003,'HR','2019-01-01', '9999-12-31' , 1)

select * from dim_dep
Surrokey depID	dep	StartDate	EndDate	        IsActivity
1	 1001	IT	2019-01-01	9999-12-31	true
2	 1002	Sales	2019-01-01	9999-12-31	true
3	 1003	HR	2019-01-01	9999-12-31	true
%python
dbutils.fs.ls('dbfs:/mnt/dim')
path	name	size	modificationTime
Out[43]: [FileInfo(path='dbfs:/mnt/dim/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/dim/part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', name='part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', size=1858, modificationTime=1736027755000)]

New coming source data

The new coming source data which may contain new record or updated record.

Dimension changed and new data comes 
depId       dep
1002        wholesale 
1003        HR  
1004        Finance     

  • depID 1002, dep changed from “Sales” to “wholesale”, updating dim_dep table;
  • depID 1003, nothing changed, no action required
  • depID 1004, is a new record, inserting into dim_dep

Assuming the data, originating from other business processes, is now stored in the data lake as CSV files.

Implementing SCD Type 2

Step 1: Read the source

%python 
df_dim_dep_source = spark.read.csv('dbfs:/FileStore/dep.csv', header=True)

df_dim_dep_source.show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Step 2: Read the target

df_dim_dep_target = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_dim_dep_target.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Step 3: Source Left outer Join Target

We perform a source dataframe – df_dim_dep_source, left outer join target dataframe – df_dim_dep_target, where source depID = target depID, and also target’s IsActivity = 1 (meant activity)

This join’s intent is not to miss any new data coming through source. And active records in target because only for those data SCD update is required. After joining source and target, the resultant dataframe can be seen below.

src = df_dim_dep_source
tar = df_dim_dep_target
df_joined = src.join (tar,\
        (src.depid == tar.depID) \
         & (tar.IsActivity == 'true')\
        ,'left') \
    .select(src['*'] \
        , tar.Surrokey.alias('tar_surrokey')\
        , tar.depID.alias('tar_depID')\
        , tar.dep.alias('tar_dep')\
        , tar.StartDate.alias('tar_StartDate')\
        , tar.EndDate.alias('tar_EndDate')\
        , tar.IsActivity.alias('tar_IsActivity')   )
    
df_joined.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1003|       HR|           3|     1003|     HR|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

Step 4: Filter only the non matched and updated records

In this demo, we only have depid and dep two columns. But in the actual development environment, may have many many columns.

Instead of comparing multiple columns, e.g.,
src_col1 != tar_col1,
src_col2 != tar_col2,
…..
src_colN != tar_colN
We compute hashes for both column combinations and compare the hashes. In addition of this, in case of column’s data type is different, we convert data type the same one.

from pyspark.sql.functions import col , xxhash64

df_filtered = df_joined.filter(\
    xxhash64(col('depid').cast('string'),col('dep').cast('string')) \
    != \
    xxhash64(col('tar_depID').cast('string'),col('tar_dep').cast('string'))\
    )
    
df_filtered.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

from the result, we can see:

  • The row, dep_id = 1003, dep = HR, was filtered out because both source and target side are the same. No action required.
  • The row, depid =1002, dep changed from “Sales” to “Wholesale”, need updating.
  • The row, depid = 1004, Finance is brand new row, need insert into target side – dimension table.

Step 5: Find out records that will be used for inserting

From above discussion, we have known depid=1002, need updating and depid=1004 is a new rocord. We will create a new column ‘merge_key’ which will be used for upsert operation. This column will hold the values of source id.

Add a new column – “merge_key”

df_inserting = df_filtered. withColumn('merge_key', col('depid'))

df_inserting.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
The above 2 records will be inserted as new records to the target table

The above 2 records will be inserted as new records to the target table.

Step 6: Find out the records that will be used for updating in target table

from pyspark.sql.functions import lit
df_updating = df_filtered.filter(col('tar_depID').isNotNull()).withColumn('merge_key',lit('None')

df_updating.show()
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
| 1003|Wholesale|           3|     1003|   2019-01-01| 9999-12-31|          true|     None|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
The above record will be used for updating SCD columns in the target table.

This dataframe filters the records that have tar_depID column not null which means, the record already exists in the table for which SCD update has to be done. The column merge_key will be ‘None’ here which denotes this only requires update in SCD cols.

Step 7: Combine inserting and updating records as stage

df_stage_final = df_updating.union(df_instering)

df_stage_final.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     None| <-- updating in SCD table
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002| <-- inserting in SCD table
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004| <-- inserting in SCD table
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
  • records with merge_key as none are for updating in existing dimension table.
  • records with merge_key not null will be inserted as new records in dimension table.

Step 8: Upserting the dim_dep Dimension Table

Before performing the upsert, let’s quickly review the existing dim_dep table and the incoming source data.

# Existing dim_dep table
spark.read.table('dim_dep').show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

# coming updated source data
park.read.csv('dbfs:/FileStore/dep_src.csv', header=True).show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Implementing an SCD Type 2 UpSert on the dim_dep Dimension Table

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, to_date, lit

# define the source DataFrame
src = df_stage_final  # this is a DataFrame object

# Load the target Delta table
tar = DeltaTable.forPath(spark, "dbfs:/mnt/dim")  # target Dimension table


# Performing the UpSert
tar.alias("tar").merge(
    src.alias("src"),
    condition="tar.depID == src.merge_key and tar_IsActivity = 'true'"
).whenMatchedUpdate( \
    set = { \
        "IsActivity": "'false'", \
        "EndDate": "current_date()" \
        }) \
.whenNotMatchedInsert( \
    values = \
    {"depID": "src.depid", \
    "dep": "src.dep", \
    "StartDate": "current_date ()", \
    "EndDate": """to_date('9999-12-31', 'yyyy-MM-dd')""", \
    "IsActivity": "'true' \
    "}) \
.execute()

all done!

Validating the result

spark.read.table('dim_dep').sort(['depID','Surrokey']).show()
+--------+-----+---------+----------+----------+----------+
|Surrokey|depID|      dep| StartDate|   EndDate|IsActivity|
+--------+-----+---------+----------+----------+----------+
|       1| 1001|       IT|2019-01-01|9999-12-31|      true|
|       2| 1002|    Sales|2019-01-01|2020-01-05|     false| <--inactived
|       4| 1002|Wholesale|2020-01-05|9999-12-31|      true| <--updated status
|       3| 1003|       HR|2019-01-01|9999-12-31|      true|
|       5| 1004|  Finance|2020-01-05|9999-12-31|      true| <--append new record
+--------+-----+---------+----------+----------+----------+

Conclusion

we demonstrated how to unlock the power of Slowly Changing Dimension (SCD) Type 2 using Delta Lake, a revolutionary storage layer that transforms data lakes into reliable, high-performance, and scalable repositories.  With this approach, organizations can finally unlock the full potential of their data and make informed decisions with confidence

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

delta: Schema Evolution

Schema Evolution in Databricks refers to the ability to automatically adapt and manage changes in the structure (schema) of a Delta Lake table over time. It allows users to modify the schema of an existing table (e.g., adding or updating columns) without the need for a complete rewrite of the data.

Key Features of Schema Evolution

  1. Automatic Adaptation: Delta Lake can automatically evolve the schema of a table when new columns are added to the incoming data, or when data types change, if certain configurations are enabled.
  2. Backward and Forward Compatibility: Delta Lake ensures that new data can be written to a table without breaking the existing schema. It also ensures that existing queries remain compatible, even if the schema changes.

Configuration for Schema Evolution

mergeSchema

This option allows you to append new data with a schema that differs from the existing table schema. It merges the new schema into the table.

Usage: Typically used when you are appending data.

Schema Merging: Use mergeSchema only for adding new columns, not for incompatible changes.

When new data has additional columns that aren’t present in the target Delta table, Delta Lake can automatically merge the new schema into the existing table schema.


# Append new data to the Delta table with automatic schema merging

df_new_data.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta-table")


overwriteSchema

This option is used when you want to completely replace the schema of the table with the schema of the new data.

If you want to replace the entire schema (including removing existing columns), you can use the overwriteSchema option.


# Overwrite the existing Delta table schema with new data

df_new_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/path/to/delta-table")


Configure spark.databricks.delta.schema.autoMerge

You can configure this setting at the following levels:

Usage: Typically used when you are overwriting data

  • Session Level (applies to a specific session or job)
  • Cluster Level (applies to all jobs on the cluster)

Session-Level Configuration (Spark session level)

Once this is enabled, all write and merge operations in the session will automatically allow schema evolution.


# Enable auto schema merging for the session

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Cluster-Level Configuration

This enables automatic schema merging for all operations on the cluster without needing to set it in each job.

  1. Go to your Databricks Workspace.
  2. Navigate to Clusters and select your cluster.
  3. Go to the Configuration tab.
  4. Under Spark Config, add the following configuration:
    spark.databricks.delta.schema.autoMerge.enabled true

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Data lake vs delta lake vs data lakehouse, and data warehouses comparison

As a data engineer, we often hear terms like Data Lake, Delta Lake, Data Lakehouse, and data warehouse, which might be confusing at times. Today, we’ll explain these terms and talk about the differences of each of the technologies and concepts, along with scenarios of usage for each.

Delta Lake

Delta lake is an open-source technology, a storage layer, built on top of Apache Spark. We use Delta Lake to store data in Delta tables. Delta lake improves data storage by supporting ACID transactions, high-performance query optimizations, schema evolution, data versioning and many other features.

Delta Lake takes your existing Parquet data lake and makes it more reliable and performant by:

  1. Storing all the metadata in a separate transaction log
  2. Tracking all the changes to your data in this transaction log
  3. Organizing your data for maximum query performance

Data Lakehouse

Data lakehouse is a new unified architecture, open data management architecture that combines the flexibility, cost-efficiency, and scale of data lakes with the data management and ACID transactions of data warehouses, enabling business intelligence (BI) and machine learning (ML) on all data.

Data Lake

A data lake is a centralized repository that allows organizations to store vast amounts of structured, semi-structured, and unstructured data. Unlike traditional data warehouses, a data lake retains data in its raw form until it is needed, which provides flexibility in how the data can be used.

Data Warehouse

A data warehouse is a centralized repository that stores structured data (database tables, Excel sheets) and semi-structured data (XML files, webpages) Its data is usually cleaned and standardized for the purposes of reporting and analysis. 

Data lakes vs. data lakehouse vs. data warehouses

follow table simply compared what difference .

 Data lakeData lakehouseData warehouse
Types of dataAll types: Structured data, semi-structured data, unstructured (raw) dataAll types: Structured data, semi-structured data, unstructured (raw) dataStructured data only
Cost$$$$$
FormatOpen formatOpen formatClosed, proprietary format
ScalabilityScales to hold any amount of data at low cost, regardless of typeScales to hold any amount of data at low cost, regardless of typeScaling up becomes exponentially more expensive due to vendor costs
Intended usersLimited: Data scientistsUnified: Data analysts, data scientists, machine learning engineersLimited: Data analysts
ReliabilityLow quality, data swampHigh quality, reliable dataHigh quality, reliable data
Ease of useDifficult: Exploring large amounts of raw data can be difficult without tools to organize and catalog the dataSimple: Provides simplicity and structure of a data warehouse with the broader use cases of a data lakeSimple: Structure of a data warehouse enables users to quickly and easily access data for reporting and analytics
PerformancePoorHighHigh

summary

Data lakes are a good technology that give you flexible and low-cost data storage. Data lakes can be a great choice for you if:

  • You have data in multiple formats coming from multiple sources
  • You want to use this data in many different downstream tasks, e.g. analytics, data science, machine learning, etc.
  • You want flexibility to run many different kinds of queries on your data and do not want to define the questions you want to ask your data in advance
  • You don’t want to be locked into a vendor-specific proprietary table format

Data lakes can also get messy because they do not provide reliability guarantees. Data lakes are also not always optimized to give you the fastest query performance.

Delta Lake is almost always more reliable, faster and more developer-friendly than a regular data lake. Delta lake can be a great choice for you because:

  • You have data in multiple formats coming from multiple sources
  • You want to use this data in many different downstream tasks, e.g. analytics, data science, machine learning, etc.
  • You want flexibility to run many different kinds of queries on your data and do not want to define the questions you want to ask your data in advance
  • You don’t want to be locked into a vendor-specific proprietary table format

Please do not hesitate to contact me if you have any questions at William . chen @mainri.ca 

(remove all space from the email account 😊)