Implementing Slowly Changing Dimension Type 2 Using Delta Lake on Databricks

Built on Apache Spark, Delta Lake provides a robust storage layer for data in Delta tables. Its features include ACID transactions, high-performance queries, schema evolution, and data versioning, among others.

Today’s focus is on how Delta Lake simplifies the management of slowly changing dimensions (SCDs).

Quickly review Type 2 of Slowly Changing Dimension 

A quick recap of SCD Type 2 follows:

  • Storing historical dimension data with effective dates.
  • Keeping a full history of dimension changes (with start/end dates).
  • Adding new rows for dimension changes (preserving history).
# Existing Dimension data
surrokey  depID   dep	StartDate   EndDate     IsActivity
1	  1001	  IT	2019-01-01  9999-12-31  1
2	  1002	  Sales	2019-01-01  9999-12-31  1
3	  1003	  HR	2019-01-01  9999-12-31  1

# Dimension changed and new data comes 
depId dep
1003  wholesale   <--- depID is same, name changed from "Sales" to "wholesale"
1004  Finance     <--- new data

# the new Dimension will be:
surrokey  depID	dep	   StartDate   EndDate     IsActivity 
1	  1001	IT	   2019-01-01  9999-12-31  1   <-- No action required
2	  1002	HR	   2019-01-01  9999-12-31  1   <-- No action required
3	  1003	Sales	   2019-01-01  2020-12-31  0   <-- mark as inactive
4         1003  wholesale  2021-01-01  9999-12-31  1   <-- add updated active value
5         1004  Finance    2021-01-01  9999-12-31  1   <-- insert new data

Creating demo data

We’re creating a Delta table, dim_dep, and inserting three rows of existing dimension data.

Existing dimension data

%sql
# Create table dim_dep
%sql
create table dim_dep (
Surrokey BIGINT  GENERATED ALWAYS AS IDENTITY
, depID  int
, dep	string
, StartDate   DATE 
, End_date DATE 
, IsActivity BOOLEAN
)
using delta
location 'dbfs:/mnt/dim/'

# Insert data
insert into dim_dep (depID,dep, StartDate,EndDate,IsActivity) values
(1001,'IT','2019-01-01', '9999-12-31' , 1),
(1002,'Sales','2019-01-01', '9999-12-31' , 1),
(1003,'HR','2019-01-01', '9999-12-31' , 1)

select * from dim_dep
Surrokey depID	dep	StartDate	EndDate	        IsActivity
1	 1001	IT	2019-01-01	9999-12-31	true
2	 1002	Sales	2019-01-01	9999-12-31	true
3	 1003	HR	2019-01-01	9999-12-31	true
%python
dbutils.fs.ls('dbfs:/mnt/dim')
path	name	size	modificationTime
Out[43]: [FileInfo(path='dbfs:/mnt/dim/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/dim/part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', name='part-00000-5f9085db-92cc-4e2b-886d-465924de961b-c000.snappy.parquet', size=1858, modificationTime=1736027755000)]

New coming source data

The new coming source data which may contain new record or updated record.

Dimension changed and new data comes 
depId       dep
1002        wholesale 
1003        HR  
1004        Finance     

  • depID 1002, dep changed from “Sales” to “wholesale”, updating dim_dep table;
  • depID 1003, nothing changed, no action required
  • depID 1004, is a new record, inserting into dim_dep

Assuming the data, originating from other business processes, is now stored in the data lake as CSV files.

Implementing SCD Type 2

Step 1: Read the source

%python 
df_dim_dep_source = spark.read.csv('dbfs:/FileStore/dep.csv', header=True)

df_dim_dep_source.show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Step 2: Read the target

df_dim_dep_target = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_dim_dep_target.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Step 3: Source Left outer Join Target

We perform a source dataframe – df_dim_dep_source, left outer join target dataframe – df_dim_dep_target, where source depID = target depID, and also target’s IsActivity = 1 (meant activity)

This join’s intent is not to miss any new data coming through source. And active records in target because only for those data SCD update is required. After joining source and target, the resultant dataframe can be seen below.

src = df_dim_dep_source
tar = df_dim_dep_target
df_joined = src.join (tar,\
        (src.depid == tar.depID) \
         & (tar.IsActivity == 'true')\
        ,'left') \
    .select(src['*'] \
        , tar.Surrokey.alias('tar_surrokey')\
        , tar.depID.alias('tar_depID')\
        , tar.dep.alias('tar_dep')\
        , tar.StartDate.alias('tar_StartDate')\
        , tar.EndDate.alias('tar_EndDate')\
        , tar.IsActivity.alias('tar_IsActivity')   )
    
df_joined.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1003|       HR|           3|     1003|     HR|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

Step 4: Filter only the non matched and updated records

In this demo, we only have depid and dep two columns. But in the actual development environment, may have many many columns.

Instead of comparing multiple columns, e.g.,
src_col1 != tar_col1,
src_col2 != tar_col2,
…..
src_colN != tar_colN
We compute hashes for both column combinations and compare the hashes. In addition of this, in case of column’s data type is different, we convert data type the same one.

from pyspark.sql.functions import col , xxhash64

df_filtered = df_joined.filter(\
    xxhash64(col('depid').cast('string'),col('dep').cast('string')) \
    != \
    xxhash64(col('tar_depID').cast('string'),col('tar_dep').cast('string'))\
    )
    
df_filtered.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+

from the result, we can see:

  • The row, dep_id = 1003, dep = HR, was filtered out because both source and target side are the same. No action required.
  • The row, depid =1002, dep changed from “Sales” to “Wholesale”, need updating.
  • The row, depid = 1004, Finance is brand new row, need insert into target side – dimension table.

Step 5: Find out records that will be used for inserting

From above discussion, we have known depid=1002, need updating and depid=1004 is a new rocord. We will create a new column ‘merge_key’ which will be used for upsert operation. This column will hold the values of source id.

Add a new column – “merge_key”

df_inserting = df_filtered. withColumn('merge_key', col('depid'))

df_inserting.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002|
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
The above 2 records will be inserted as new records to the target table

The above 2 records will be inserted as new records to the target table.

Step 6: Find out the records that will be used for updating in target table

from pyspark.sql.functions import lit
df_updating = df_filtered.filter(col('tar_depID').isNotNull()).withColumn('merge_key',lit('None')

df_updating.show()
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
| 1003|Wholesale|           3|     1003|   2019-01-01| 9999-12-31|          true|     None|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
The above record will be used for updating SCD columns in the target table.

This dataframe filters the records that have tar_depID column not null which means, the record already exists in the table for which SCD update has to be done. The column merge_key will be ‘None’ here which denotes this only requires update in SCD cols.

Step 7: Combine inserting and updating records as stage

df_stage_final = df_updating.union(df_instering)

df_stage_final.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid|      dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     None| <-- updating in SCD table
| 1002|Wholesale|           2|     1002|  Sales|   2019-01-01| 9999-12-31|          true|     1002| <-- inserting in SCD table
| 1004|  Finance|        null|     null|   null|         null|       null|          null|     1004| <-- inserting in SCD table
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
  • records with merge_key as none are for updating in existing dimension table.
  • records with merge_key not null will be inserted as new records in dimension table.

Step 8: Upserting the dim_dep Dimension Table

Before performing the upsert, let’s quickly review the existing dim_dep table and the incoming source data.

# Existing dim_dep table
spark.read.table('dim_dep').show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

# coming updated source data
park.read.csv('dbfs:/FileStore/dep_src.csv', header=True).show()
+-----+---------+
|depid|      dep|
+-----+---------+
| 1002|Wholesale|
| 1003|       HR|
| 1004|  Finance|
+-----+---------+

Implementing an SCD Type 2 UpSert on the dim_dep Dimension Table

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, to_date, lit

# define the source DataFrame
src = df_stage_final  # this is a DataFrame object

# Load the target Delta table
tar = DeltaTable.forPath(spark, "dbfs:/mnt/dim")  # target Dimension table


# Performing the UpSert
tar.alias("tar").merge(
    src.alias("src"),
    condition="tar.depID == src.merge_key and tar_IsActivity = 'true'"
).whenMatchedUpdate( \
    set = { \
        "IsActivity": "'false'", \
        "EndDate": "current_date()" \
        }) \
.whenNotMatchedInsert( \
    values = \
    {"depID": "src.depid", \
    "dep": "src.dep", \
    "StartDate": "current_date ()", \
    "EndDate": """to_date('9999-12-31', 'yyyy-MM-dd')""", \
    "IsActivity": "'true' \
    "}) \
.execute()

all done!

Validating the result

spark.read.table('dim_dep').sort(['depID','Surrokey']).show()
+--------+-----+---------+----------+----------+----------+
|Surrokey|depID|      dep| StartDate|   EndDate|IsActivity|
+--------+-----+---------+----------+----------+----------+
|       1| 1001|       IT|2019-01-01|9999-12-31|      true|
|       2| 1002|    Sales|2019-01-01|2020-01-05|     false| <--inactived
|       4| 1002|Wholesale|2020-01-05|9999-12-31|      true| <--updated status
|       3| 1003|       HR|2019-01-01|9999-12-31|      true|
|       5| 1004|  Finance|2020-01-05|9999-12-31|      true| <--append new record
+--------+-----+---------+----------+----------+----------+

Conclusion

we demonstrated how to unlock the power of Slowly Changing Dimension (SCD) Type 2 using Delta Lake, a revolutionary storage layer that transforms data lakes into reliable, high-performance, and scalable repositories.  With this approach, organizations can finally unlock the full potential of their data and make informed decisions with confidence

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

PySpark DataFrame

PySpark DataFrame is a distributed collection of rows, similar to a table in a relational database or a DataFrame in Python’s pandas library. It provides powerful tools for querying, transforming, and analyzing large-scale structured and semi-structured data.

PySpark apply functions

Apply a function to a column

df.withColumn("Upper_Name", upper(df.Name))
df.select("Seqno","Name", upper(df.Name))

df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB")

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(upperCase,StringType())
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB")
collect ( )

collect () is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
dataCollect = deptDF.collect()
print(dataCollect)
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
Column Class

Access column

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)
+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

Column Operators

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

df.select(df.col2 < df.col3).show()
+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

df.select(df.col2 == df.col3).show()
+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+
Convert DataFrame to Pandas

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

pandasDF = pysparkDF.toPandas()
Convert RDD to DataFrame
df = rdd.toDF()
Create an empty DataFrame

Create an empty DataFrame

#Create Schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()
dropDuplicates, distinct ()

key different between distinct() and dropDuplicates()

  • distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness.
  • distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct().
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |  #duplicated
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df.distinct().show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
Distinct count: 9
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

df2 = df.dropDuplicates(["department"])
print("Distinct count: "+str(df2.count()))
Distinct count: 3
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
+-------------+----------+------+
fillna() & fill()

DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values.

# Prepare Data
data = [("James", None, 3000), \
    ("Michael", "Sales", None), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", None), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", None, 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |null      |3000  |
|Michael      |Sales     |null  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |null  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |null      |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|      null|  3000|
|      Michael|     Sales|     0|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|     0|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|      null|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
#Replace 0 for null on only population column 
df.na.fill(value="unknown",subset=["department"]).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|   unknown|  3000|
|      Michael|     Sales|  null|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  null|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|   unknown|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
groupBy ( )

groupBy ( ), Similar to SQL GROUP BY clause,  transformation that is used to group rows that have the same values in specified columns into summary rows

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|     Sales|   NY|     176000|     30000|
|     Sales|   CA|      81000|     23000|
|   Finance|   CA|     189000|     47000|
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
| Marketing|   CA|      80000|     18000|
+----------+-----+-----------+----------+
join ( )
  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
  • Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows.
  • Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame.
  • Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame.
  • Self Join:
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
# Inner join
deptDF.join(empDF, deptDF.dept_id==empDF.emp_dept_id, "inner").show()
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
return: left-hand has, right-hand has too
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
return: left-hand has, but right-hand does not have.
# Self join
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
orderBy( ) and sort( )

orderBy() and sort() can be interchange each other

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
# Sorting different columns in different orders

df.sort("state", "age",ascending=[False,True]).show()
df.sort(df["state"].desc(), df["age"].asc()).show()
df.orderBy("state", "age",ascending=[False,True]).show()
df.orderBy(df["state"].desc(), df["age"].asc()).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+
partitionBy ( )

partitionBy ( )

pivot ( ) & Unpivot ( )

pivot() (Row to Column)

#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
# Output
root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

# Applying pivot()
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

# Output
root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

# one more example
pivotDF = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
root
 |-- Country: string (nullable = true)
 |-- Banana: long (nullable = true)
 |-- Beans: long (nullable = true)
 |-- Carrots: long (nullable = true)
 |-- Orange: long (nullable = true)

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|China  |400   |1500 |1200   |4000  |
|USA    |1000  |1600 |1500   |4000  |
|Mexico |null  |2000 |null   |null  |
|Canada |2000  |null |2000   |null  |
+-------+------+-----+-------+------+

Unpivot 

PySpark SQL doesn’t have unpivot function hence will use the stack() function. 

# Applying unpivot()
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+
sample(), sampleBy()

sample(), is a mechanism to get random sample records from the dataset

Syntax

sample(withReplacement, fraction, seed=None)

  • fraction – Fraction of rows to generate, range [0.0, 1.0]. Note that it doesn’t guarantee to provide the exact number of the fraction of records.
  • seed – Seed for sampling (default a random seed). Used to reproduce the same random sampling.
  • withReplacement – Sample with replacement or not (default False).
df=spark.range(100)
print(df.sample(0.06).collect())

#Output: [Row(id=0), Row(id=2), Row(id=17), Row(id=25), Row(id=26), Row(id=44), Row(id=80)]

Above example, my DataFrame has 100 records and I wanted to get 6% sample records which are 6 but the sample() function returned 7 records. This proves the sample function doesn’t return the exact fraction specified.

To get consistent same random sampling uses the same slice value for every run. Change slice value to get different results.

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,456).collect())
//Output: 19,21,42,48,49,50,75,80

sampleBy()

sampleBy(col, fractions, seed=None)

df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

//Output: [Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]
select ( )
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+
# Select columns by different ways
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

# By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Nested Struct Columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+
# Select child columns
df2.select("name.firstname","name.lastname").show(truncate=False)
+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+
# Select all child columns
df2.select("name.*").show(truncate=False)
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+
show ( )
df.show()
+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+
df.show(truncate=False)
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)
-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
only showing top 3 rows
StructType & StructField

StructType

Defines the structure of the DataFrame. StructType represents a schema, which is a collection of StructField objects. A StructType is essentially a list of fields, each with a name and data type, defining the structure of the DataFrame. It allows for the creation of nested structures and complex data types.

StructField

StructField – Defines the metadata of the DataFrame column. It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

nested StructType

# nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+
transform ( )

transform ( )

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# PySpark transform() Usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount)
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+
UDF

UDF (User Defined Function)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
#create a python function
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 
#Convert a Python function to PySpark UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

convertUDF = udf(lambda z: convertCase(z),StringType())
or 
convertUDF = udf(convertCase,StringType())
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+
union ( ) & unionAll ( )

union ( ) & unionAll ( ) are the same result. unionAll is older, retired


df1
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+
df2
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
df.union(df2).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|<--duplicated
|Maria        |Finance   |CA   |90000 |24 |23000|<--duplicated
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
unionByName ( )

df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2

df1
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+
df2
+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+
# different columns order
df3 = df1.unionByName(df2)
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+
# different columns name and order 
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+
df3=df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+
where() & filter()

where() & filter() can replace each other

  • Use &, |, ~ for logical operations (AND, OR, NOT).
  • Use ==, !=, >, <, >=, <= for comparisons.
  • Always wrap column references in col() for clarity.
  • For SQL-like patterns, consider using functions like like, isin, and between.
  • IS NULL –> “isNull ( )”
  • IS NOT NULL –> “isNotNull ( )”
  • LIKE –> “like ( %abc% )”
  • IN –> “isin (18, 21, 25)”
  • BETWEEN –> “between(18, 25)”
+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+
df.select("gender").filter(df.gender == "M").show()
df.select("gender").where(df.gender == "F").show()
+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     M|
+------+

+------+
|gender|
+------+
|     F|
|     F|
+------+
withColumn()
from pyspark.sql.functions import date_add, col
df.withColumn("dob", date_add("dob", 10)).\
withColumn("newsalary",col("salary")*100).\
drop("middlename").show()
+---------+--------+----------+------+------+---------+
|firstname|lastname|       dob|gender|salary|newsalary|
+---------+--------+----------+------+------+---------+
|    James|   Smith|1991-04-11|     M|   300|    30000|
|  Michael|        |2000-05-29|     M|   400|    40000|
|   Robert|Williams|1978-09-15|     M|   400|    40000|
|    Maria|   Jones|1967-12-11|     F|   400|    40000|
|      Jen|   Brown|1980-02-27|     F|    -1|     -100|
+---------+--------+----------+------+------+---------+
withColumnRenamed ( )

withColumnRenamed() rename a DataFrame column, we often need to rename one column or multiple (or all) columns on PySpark DataFrame

+--------------------+----------+------+------+
|                name|       dob|gender|salary|
+--------------------+----------+------+------+
|    {James, , Smith}|1991-04-01|     M|  3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|    -1|
+--------------------+----------+------+------+
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.show()
+--------------------+-----------+------+-------------+
|                name|DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|    {James, , Smith}| 1991-04-01|     M|         3000|
|   {Michael, Rose, }| 2000-05-19|     M|         4000|
|{Robert, , Williams}| 1978-09-05|     M|         4000|
|{Maria, Anne, Jones}| 1967-12-01|     F|         4000|
|  {Jen, Mary, Brown}| 1980-02-17|     F|           -1|
+--------------------+-----------+------+-------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

deltaTable vs DataFrames

In Databricks and PySpark, DeltaTables and DataFrames both handle structured data but differ in functionality and use cases. Here’s a detailed comparison:

Definitions

DeltaTable

A DeltaTable is a storage format based on Apache Parquet, with support for ACID transactions, versioning, schema enforcement, and advanced file operations. It is managed by the Delta Lake protocol, offering features like time travel, upserts, and deletion.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is an abstraction for structured and semi-structured data in Spark. It is a purely in-memory abstraction and does not directly manage storage or transactions.

Features

FeatureDeltaTableDataFrame
PersistenceStores data on disk in a managed format.Primarily in-memory abstraction (ephemeral).
Schema EnforcementEnforces schema when writing/updating.No schema enforcement unless explicitly specified.
ACID TransactionsSupports atomic writes, updates, and deletes.Not transactional; changes require reprocessing.
VersioningMaintains historical versions (time travel).No versioning; a snapshot of data.
Upserts and DeletesSupports MERGE, UPDATE, and DELETE.Does not directly support these operations.
PerformanceOptimized for storage (Z-order indexing, compaction).Optimized for in-memory transformations.
Time TravelQuery historical data using snapshots.No time travel support.
IndexingSupports indexing (Z-order, data skipping).No indexing capabilities.

Use Cases

DeltaTable

Ideal for persistent storage with advanced capabilities:

  • Data lakes or lakehouses.
  • ACID-compliant operations (e.g., MERGE, DELETE).
  • Time travel to access historical data.
  • Optimizing storage with compaction or Z-ordering.
  • Schema evolution during write operations.

DataFrame

Best for in-memory processing and transformations:

  • Ad-hoc queries and ETL pipelines.
  • Working with data from various sources (files, databases, APIs).
  • Temporary transformations before persisting into Delta or other formats.

Common APIs

DeltaTable

Load Delta table from a path:

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

Merge data:

delta_table.alias("target").merge( 
source_df.alias("source"), 
"target.id = source.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Time Travel:

df = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta/table")

Optimize

OPTIMIZE '/path/to/delta/table' ZORDER BY (column_name);

DataFrame

Read

df = spark.read.format("parquet").load("/path/to/data")

Transformations

transformed_df = df.filter(df.age > 30).groupBy("gender").count()

Write

df.write.format("delta").save("/path/to/save")

Transition Between DeltaTables and DataFrames

Convert DeltaTable to DataFrame:

df = delta_table.toDF()

Write DataFrame to Delta format:

df.write.format("delta").save("/path/to/delta/table")

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Summary of Dataframe Methods

Summary of Dataframe Methods

CategoryMethodExample
InspectionprintSchema()df.printSchema()
columnsdf.columns
Selectionselect()df.select("col1", "col2").show()
withColumn()df.withColumn("new_col", col("col1") + 1)
withColumnRenamed()df.withColumnRenamed("old", "new")
distinct()df.select(“cut”).distinct().show()
take(5) df.take(5) # Retrieve the first 5 rows.
drop( )df.drop(‘col’).show() #drop col
Filteringfilter()
where ( )
df.filter(df.col1 > 10).show()
df . where (df.col1 > 10) . show()
AggregationsgroupBy().agg()df.groupBy("col").agg(sum("val")).show()
count()df.count()
Joinsjoin()df1.join(df2, df1.id == df2.id, "inner")
Left Joindf1.join(df2, df1.id == df2.id, “left_outer”).show()
SortingorderBy()
sort( )
df.orderBy("col1").show()
df.sort(df.col1.desc()).show( )
Null Handlingdropna(), fillna()df.fillna({"col1": 0}).show()
isNotNull( )df.filter(col(‘cut’).isNotNull( )).show( )
isNull( )df.filter(col(‘cut’).isNull( )).show()
dropna()df.dropna(subset=[“col1”]).show()
Date/Timeyear(), month(), dayofmonth()df.withColumn("year", year("date_col"))
Writingwrite.format()df.write.csv(“path/to/csv”, header=True) df.write.json(“path/to/json”)
Save as Tablewrite.format().mode( ).saveAsTable( )df.write.format(“delta”).saveAsTable(“my_table”)
Create as ViewcreateOrReplaceTempView( )df.createOrReplaceTempView(“temp_view_name”)
createOrReplaceGlobalTempView( )df.createOrReplaceGlobalTempView(“global_temp_view_name”)
String Opsupper(), concat()df.withColumn("upper", upper("col"))
PartitioningpartitionBy(“col”)f.write.partitionBy(“department”).parquet(“output/parquet_data”)
repartition(4)df.repartition(4).show() # Repartition into 4 partitions.
coalesce(2)df.coalesce(2).show() # Reduce to 2 partitions.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

arrayType, mapType column and functions

In PySpark, ArrayType and MapType are used to define complex data structures within a DataFrame schema.

ArrayType column, and functions,

ArrayType allows you to store and work with arrays, which can hold multiple values of the same data type.

sample dataframe:
id, numbers|
1, [1, 2, 3]
2, [4, 5, 6]
3, [7, 8, 9]

explode ()

“explode” a given array into individual new rows using the explode function, Offen use it to flatten JSON.

from pyspark.sql.functions import explode

# Explode the 'numbers' array into separate rows
exploded_df = df.withColumn("number", explode(df.numbers))
display(explode_df)
==output==
id	numbers	number
1	[1,2,3]	1
1	[1,2,3]	2
1	[1,2,3]	3
2	[4,5,6]	4
2	[4,5,6]	5
2	[4,5,6]	6
3	[7,8,9]	7
3	[7,8,9]	8
3	[7,8,9]	9
split ()

Split strings based on a specified delimiter, return a array type.

from pyspark.sql.functions import split
df.withColumn(“Name_Split”, split(df[“Name”], “,”))

sample dataframe
+————–+
| Name |
+————–+
| John,Doe |
| Jane,Smith |
| Alice,Cooper |
+————–+

from pyspark.sql.functions import split
# Split the 'Name' column by comma
df_split = df.withColumn("Name_Split", split(df["Name"], ","))

==output==
+-------------+----------------+
| Name        | Name_Split     |
+-------------+----------------+
| John,Doe    | [John, Doe]    |
| Jane,Smith  | [Jane, Smith]  |
| Alice,Cooper| [Alice, Cooper]|
+-------------+----------------+
array ()

Creates an array column.

from pyspark.sql.functions import array, col
data=[(1,2,3),(4,5,6)]
schema=['num1','num2','num3']
df1=spark.createDataFrame(data,schema)
df1.show()
# create a new column - numbers, array type. elements use num1,num2,num3   
df1.withColumn("numbers",array(col("num1"),col("num2"),col("num3"))).show()

==output==
+----+----+----+
|num1|num2|num3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

#new array column "numbers" created
+----+----+----+-----------+
|num1|num2|num3| numbers   |
+----+----+----+-----------+
|   1|   2|   3| [1, 2, 3] |
|   4|   5|   6| [4, 5, 6] |
+----+----+----+-----------+
array_contains ()

Checks if an array contains a specific element.

from pyspark.sql.functions import array_contains
array_contains(array, value)

sample dataframe
+—+———————–+
|id |fruits |
+—+———————–+
|1 |[apple, banana, cherry]|
|2 |[orange, apple, grape] |
|3 |[pear, peach, plum] |
+—+———————–+

from pyspark.sql.functions import array_contains

# Using array_contains to check if the array contains 'apple'
df.select("id", array_contains("fruits", "apple").alias("has_apple")).show()

==output==
+---+----------+
| id|has_apple |
+---+----------+
|  1|      true|
|  2|      true|
|  3|     false|
+---+----------+
getItem()

Access individual elements of an array by their index using the getItem() method

# Select the second element (index start from 0) of the 'numbers' array
df1 = df.withColumn("item_1_value",   df.numbers.getItem(1))
display(df1)
==output==
id	numbers	      item_1_value
1	[1,2,3]	       2
2	[4,5,6]	       5
3	[7,8,9]	       8
size ()

Returns the size of the array.

from pyspark.sql.functions import size

# Get the size of the 'numbers' array
df.select(size(df.numbers)).show()

==output==
+-------------+
|size(numbers)|
+-------------+
|            3|
|            3|
|            3|
+-------------+
sort_array()

Sorts the array elements.

sort_array(col: ‘ColumnOrName’, asc: bool = True)

If `asc` is True (default) then ascending and if False then descending. if asc=True, can be omitted.

from pyspark.sql.functions import sort_array
df.withColumn("numbers", sort_array("numbers")).show()
==output==
ascending 
+---+---------+
| id|  numbers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[4, 5, 6]|
|  3|[7, 8, 9]|
+---+---------+
df.select(sort_array("numbers", asc=False).alias("sorted_desc")).show()
==output==
descending 
+-----------+
|sorted_desc|
+-----------+
|  [3, 2, 1]|
|  [6, 5, 4]|
|  [9, 8, 7]|
+-----------+
concat ()

concat() is used to concatenate arrays (or strings) into a single array (or string). When dealing with ArrayType, concat() is typically used to combine two or more arrays into one.

from pyspark.sql.functions import concat
concat(*cols)

sample DataFrames
+—+——+——+
|id |array1|array2|
+—+——+——+
|1 | [a, b] | [x, y]|
|2 | [c] | [z] |
|3 | [d, e] | null |
+—+——-+——+

from pyspark.sql.functions import concat

# Concatenating array columns
df_concat = df.withColumn("concatenated_array", concat(col("array1"), col("array2")))
df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |null              |
+---+------+------+------------------+

Handling null Values

If any of the input columns are null, the entire result can become null. This is why you’re seeing null instead of just the non-null array.

To handle this, you can use coalesce() to substitute null with an empty array before performing the concat(). coalesce() returns the first non-null argument. Here’s how you can modify your code:

from pyspark.sql.functions import concat, coalesce, lit

# Define an empty array for the same type
empty_array = array()

# Concatenate with null handling using coalesce
df_concat = df.withColumn(
    "concatenated_array",
    concat(coalesce(col("array1"), empty_array), coalesce(col("array2"), empty_array))
)

df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |[d, e]            |
+---+------+------+------------------+
array_zip ()

Combines arrays into a single array of structs.


☰ MapType column, and functions

MapType is used to represent map key-value pair similar to python Dictionary (Dic)

from pyspark.sql.types import MapType, StringType, IntegerType
# Define a MapType
my_map = MapType(StringType(), IntegerType(), valueContainsNull=True)

Parameters:

  • keyType: Data type of the keys in the map. You can use PySpark data types like StringType(), IntegerType(), DoubleType(), etc.
  • valueType: Data type of the values in the map. It can be any valid PySpark data type
  • valueContainsNull: Boolean flag (optional). It indicates whether null values are allowed in the map. Default is True.

sample dataset
# Sample dataset (Product ID and prices in various currencies)
data = [
(1, {“USD”: 100, “EUR”: 85, “GBP”: 75}),
(2, {“USD”: 150, “EUR”: 130, “GBP”: 110}),
(3, {“USD”: 200, “EUR”: 170, “GBP”: 150}),
]


sample dataframe
+———-+————————————+
|product_id|prices |
+———-+————————————+
|1 |{EUR -> 85, GBP -> 75, USD -> 100} |
|2 |{EUR -> 130, GBP -> 110, USD -> 150}|
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+———-+————————————+

Accessing map_keys (), map_values ()

Extract keys (currency codes) and values (prices):

from pyspark.sql.functions import col, map_keys, map_values
# Extract map keys and values
df.select(
    col("product_id"),
    map_keys(col("prices")).alias("currencies"),
    map_values(col("prices")).alias("prices_in_currencies")
).show(truncate=False)

==output==
+----------+---------------+--------------------+
|product_id|currencies     |prices_in_currencies|
+----------+---------------+--------------------+
|1         |[EUR, GBP, USD]|[85, 75, 100]       |
|2         |[EUR, GBP, USD]|[130, 110, 150]     |
|3         |[EUR, GBP, USD]|[170, 150, 200]     |
+----------+---------------+--------------------+
exploder ()

Use explode () to flatten the map into multiple rows, where each key-value pair from the map becomes a separate row.

from pyspark.sql.functions import explode
# Use explode to flatten the map
df_exploded = df.select("product_id", explode("prices").alias("currency", "price")).show()

==output==
+----------+--------+-----+
|product_id|currency|price|
+----------+--------+-----+
|         1|     EUR|   85|
|         1|     GBP|   75|
|         1|     USD|  100|
|         2|     EUR|  130|
|         2|     GBP|  110|
|         2|     USD|  150|
|         3|     EUR|  170|
|         3|     GBP|  150|
|         3|     USD|  200|
+----------+--------+-----+
Accessing specific elements in the map

To get the price for a specific currency (e.g., USD) for each product:

from pyspark.sql.functions import col, map_keys, map_values
# Access the value for a specific key in the map 
df.select(
    col("product_id"),
    col("prices").getItem("USD").alias("price_in_usd")
).show(truncate=False)

==output==
+----------+------------+
|product_id|price_in_usd|
+----------+------------+
|1         |100         |
|2         |150         |
|3         |200         |
+----------+------------+
filtering

filter the rows based on conditions involving the map values

from pyspark.sql.functions import col, map_keys, map_values
# Filter rows where price in USD is greater than 150
df.filter(col("prices").getItem("USD") > 150).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
map_concat ()

Combines two or more map columns by merging their key-value pairs.

from pyspark.sql.functions import map_concat, create_map, lit

# Define the additional currency as a new map using create_map()
additional_currency = create_map(lit("CAD"), lit(120))

# Add a new currency (e.g., CAD) with a fixed price to all rows
df.withColumn(
    "updated_prices",
    map_concat(col("prices"), additional_currency)
).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

withColumnRenamed(), drop(), show()

withColumnRenamed ()

withColumnRenamed(), Rename an existing column in a DataFrame

DataFrame.withColumnRenamed(existingName, newName)

existingName: The current name of the column you want to rename.
newName: The new name for the column.

drop ()

In PySpark, you can drop columns from a DataFrame using the drop() method. Here’s a breakdown of the syntax, options, parameters, and examples for dropping columns in PySpark.

Syntax

DataFrame.drop(*cols)

*cols: One or more column names (as strings) that you want to drop from the DataFrame. You can pass these as individual arguments or a list of column names.

# Drop single column 'age'
df_dropped = df.drop("age")

# Drop multiple columns 'id' and 'age'
df_dropped_multiple = df.drop("id", "age")

# Dropping Columns Using a List of Column Names
# Define columns to drop
columns_to_drop = ["id", "age"]

# Drop columns using list
df_dropped_list = df.drop(*columns_to_drop)
df_dropped_list.show()
Show()

By default, display 20 row, truncate 20 characters

df.show(n=10, truncate= True, vertical=True)

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

When your Delta tables reside in Blob Storage or Azure Data Lake Storage (ADLS), you interact with them directly using their file paths. This differs from how you might access tables managed within a metastore like Unity Catalog, where you’d use a cataloged name.

Reading Delta Tables from Blob Storage or ADLS

To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

Syntax

# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
  

Writing Delta Tables to Blob Storage or ADLS

When writing to Delta tables, use the delta format and specify the path where you want to store the table.

Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks

# PySpark 
df.write.format("delta").mode("overwrite").save("path/to/delta/table")

Options and Parameters for Delta Read/Write

Options for Reading Delta Tables:

You can configure the read operation with options like:

  • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
  • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
  • timeTravel: Enables querying older versions of the Delta table.
df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
df.show()

Options for Writing Delta Tables:

mode: Controls the write mode.

  • overwrite: Overwrites the existing data.
  • append: Adds to existing data.
  • ignore: Ignores the write if data exists.
  • errorifexists: Defaults to throwing an error if data exists.

partitionBy: Partition the data by one or more columns.

overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("column_name") \
    .save("path/to/delta/table")

Time Travel and Versioning with Delta (PySpark)

Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()

# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()

Conclusion:

  • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
  • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
  • For reading, use .format("delta").load("path").
  • For writing, use .write.format("delta").save("path").

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Read table from Unity Catalog and write table to Unity Catalog

To read from and write to Unity Catalog in PySpark, you typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name.

Reading from Unity Catalog

To read a table from Unity Catalog, specify the table’s full path:

# Reading a table
df = spark.read.table("catalog.schema.table")
df.show()

# Using Spark SQL
df = spark.sql("SELECT * FROM catalog.schema.table")

Writing to Unity Catalog

To write data to Unity Catalog, you specify the table name in the saveAsTable method:

# Writing a DataFrame to a new table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog.schema.new_table")

Options for Writing to Unity Catalog:

  • format: Set to "delta" for Delta Lake tables, as Unity Catalog uses Delta format.
  • mode: Options include overwrite, append, ignore, and error.

Example: Read, Transform, and Write Back to Unity Catalog

# Read data from a Unity Catalog table
df = spark.read.table("catalog_name.schema_name.source_table")

# Perform transformations
transformed_df = df.filter(df["column_name"] > 10)

# Write transformed data back to a different table
transformed_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_name.schema_name.target_table")

Comparison of Delta, JSON, and CSV Reads/Writes

FormatStorage LocationRead SyntaxWrite SyntaxNotes
DeltaUnity Catalogdf = spark.read.table("catalog.schema.table")df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.table")Unity Catalog natively supports Delta with schema enforcement and versioning.
Blob/ADLSdf = spark.read.format("delta").load("path/to/delta/folder")df.write.format("delta").mode("overwrite").save("path/to/delta/folder")Requires Delta Lake library; supports ACID transactions and time-travel capabilities.
JSONUnity CatalogNot directly supported in Unity Catalog; typically needs to be read as a Delta table or temporary table.Not directly supported; must be converted to Delta format before writing to Unity Catalog.Convert JSON to Delta format to enable integration with Unity Catalog.
Blob/ADLSdf = spark.read.json("path/to/json/files")df.write.mode("overwrite").json("path/to/json/folder")Simple structure, no schema enforcement by default; ideal for semi-structured data.
CSVUnity CatalogNot directly supported; CSV files should be imported as Delta tables or temporary views.Not directly supported; convert to Delta format for compatibility with Unity Catalog.Similar to JSON, requires conversion for use in Unity Catalog.
Blob/ADLSdf = spark.read.option("header", True).csv("path/to/csv/files")df.write.option("header", True).mode("overwrite").csv("path/to/csv/folder")Lacks built-in schema enforcement; additional steps needed for ACID or schema evolution.

Detailed Comparison and Notes:

  1. Unity Catalog
    • Delta: Unity Catalog fully supports Delta format, allowing for schema evolution, ACID transactions, and built-in security and governance.
    • JSON and CSV: To use JSON or CSV in Unity Catalog, convert them into Delta tables or load them as temporary views before making them part of Unity’s governed catalog. This is because Unity Catalog enforces structured data formats with schema definitions.
  2. Blob Storage & ADLS (Azure Data Lake Storage)
    • Delta: Blob Storage and ADLS support Delta tables if the Delta Lake library is enabled. Delta on Blob or ADLS retains most Delta features but may lack some governance capabilities found in Unity Catalog.
    • JSON & CSV: Both Blob and ADLS provide support for JSON and CSV formats, allowing flexibility with semi-structured data. However, they do not inherently support schema enforcement, ACID compliance, or governance features without Delta Lake.
  3. Delta Table Benefits:
    • Schema Evolution and Enforcement: Delta enables schema evolution, essential in big data environments.
    • Time Travel: Delta provides versioning, allowing access to past versions of data.
    • ACID Transactions: Delta ensures consistency and reliability in large-scale data processing.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

from_json(), to_json()

from_json()

from_json() is a function used to parse a JSON string into a structured DataFrame format (such as StructType, ArrayType, etc.). It is commonly used to deserialize JSON strings stored in a DataFrame column into complex types that PySpark can work with more easily.

Syntax

from_json(column, schema, options={})

Parameters

column: The column containing the JSON string. Can be a string that refers to the column name or a column object.

schema

Specifies the schema of the expected JSON structure.
Can be a StructType (or other types like ArrayType depending on the JSON structure).

options

  • allowUnquotedFieldNames: Allows field names without quotes. (default: false)
  • allowSingleQuotes: Allows parsing single-quoted JSON strings. (default: true)
  • allowNumericLeadingZeros: Allows leading zeros in numbers. (default: false)
  • allowBackslashEscapingAnyCharacter: Allows escaping any character with a backslash. (default: false)
  • mode: Controls how to handle malformed records
    PERMISSIVE: The default mode that sets null values for corrupted fields.
    DROPMALFORMED: Discards rows with malformed JSON strings.
    FAILFAST: Fails the query if any malformed records are found.
Sample DF
+----------------------------+
|json_string                 |
+----------------------------+
|{"name": "John", "age": 30} |
|{"name": "Alice", "age": 25}|
+----------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, col
# Define the schema for the nested JSON
schema = StructType([
    StructField("name", StructType([
        StructField("first", StringType(), True),
        StructField("last", StringType(), True)
    ]), True),
    StructField("age", IntegerType(), True)
])

# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))

# Display the parsed JSON
df_parsed.select("parsed_json.*").show(truncate=False)
+--------+---+
| name   |age|
+--------+---+
|{John, Doe}|30|
|{Alice, Smith}|25|
+--------+---+

to_json()

to_json() is a function that converts a structured column (such as one of type StructType, ArrayType, etc.) into a JSON string.

Syntax

to_json(column, options={})

Parameters

column: The column you want to convert into a JSON string.
The column should be of a complex data type, such as StructType, ArrayType, or MapType.
Can be a column name (as a string) or a Column object.

options

  • pretty: If set to true, it pretty-prints the JSON output.
  • dateFormat: Specifies the format for DateType and TimestampType columns (default: yyyy-MM-dd).
  • timestampFormat: Specifies the format for TimestampType columns (default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX).
  • ignoreNullFields: When set to true, null fields are omitted from the resulting JSON string (default: true).
  • compression: Controls the compression codec used to compress the JSON output, e.g., gzip, bzip2.
sample data
+--------------------------------------------------------+
|json_string                                             |
+--------------------------------------------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|
+--------------------------------------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, to_json, col# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))
df_parsed.show(truncate=False)

+--------------------------------------------------------+--------------------+
|json_string                                             |parsed_json         |
+--------------------------------------------------------+--------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |{{John, Doe}, 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|{{Alice, Smith}, 25}|
+--------------------------------------------------------+--------------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Comparison: split (), concat (), array_zip (), and explode ()

Featuresplit()concat()array_zip()explode()
DescriptionSplits a string column into an array of substrings based on a delimiter.Concatenates multiple arrays or strings into a single array or string.Zips multiple arrays element-wise into a single array of structs.Flattens an array into multiple rows, with one row per element in the array.
Input TypeStringArrays/StringsArraysArray
Output TypeArray of StringsArray or StringArray of StructsMultiple Rows (with original columns)
Key Use CasesSplitting strings based on delimiters (e.g., splitting comma-separated values).Merging multiple arrays into one, or multiple strings into one.Aligning data from multiple arrays element-wise, treating each set of elements as a row (struct).Flattening arrays for row-by-row processing (e.g., after zipping or concatenating arrays).
Examplesplit(col("string_col"), ",")["a", "b", "c"]concat(col("array1"), col("array2"))["a", "b", "x", "y"]array_zip(col("array1"), col("array2"))[{'a', 1}, {'b', 2}]explode(col("array_col")) → Converts an array into separate rows.
Handling Different LengthsNot applicableIf input arrays have different lengths, the shorter ones are concatenated as-is.If input arrays have different lengths, the shorter ones are padded with null.Not applicable. Converts each element into separate rows, regardless of length.
Handling null valuesWill split even if the string contains null values (but may produce empty strings).If arrays contain null, concat() still works, returning the non-null elements.Inserts null values into the struct where input arrays have null for a corresponding index.Preserves null elements during the explosion but still creates separate rows.

Breakdown:

  • split() is used to break a single string into an array of substrings.
  • concat() merges arrays or strings, resulting in a single array or string.
  • array_zip() aligns elements from multiple arrays, creating an array of structs.
  • explode() takes an array and converts it into multiple rows, one for each array element.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)