PySpark DataFrame

PySpark DataFrame is a distributed collection of rows, similar to a table in a relational database or a DataFrame in Python’s pandas library. It provides powerful tools for querying, transforming, and analyzing large-scale structured and semi-structured data.

PySpark apply functions

Apply a function to a column

df.withColumn("Upper_Name", upper(df.Name))
df.select("Seqno","Name", upper(df.Name))

df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB")

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(upperCase,StringType())
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB")
collect ( )

collect () is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
dataCollect = deptDF.collect()
print(dataCollect)
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
Column Class

Access column

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)
+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

Column Operators

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

df.select(df.col2 < df.col3).show()
+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

df.select(df.col2 == df.col3).show()
+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+
Convert DataFrame to Pandas

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

pandasDF = pysparkDF.toPandas()
Convert RDD to DataFrame
df = rdd.toDF()
Create an empty DataFrame

Create an empty DataFrame

#Create Schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()
dropDuplicates, distinct ()

key different between distinct() and dropDuplicates()

  • distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness.
  • distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct().
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |  #duplicated
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df.distinct().show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
Distinct count: 9
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

df2 = df.dropDuplicates(["department"])
print("Distinct count: "+str(df2.count()))
Distinct count: 3
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
+-------------+----------+------+
fillna() & fill()

DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values.

# Prepare Data
data = [("James", None, 3000), \
    ("Michael", "Sales", None), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", None), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", None, 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |null      |3000  |
|Michael      |Sales     |null  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |null  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |null      |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|      null|  3000|
|      Michael|     Sales|     0|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|     0|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|      null|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
#Replace 0 for null on only population column 
df.na.fill(value="unknown",subset=["department"]).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|   unknown|  3000|
|      Michael|     Sales|  null|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  null|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|   unknown|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
groupBy ( )

groupBy ( ), Similar to SQL GROUP BY clause,  transformation that is used to group rows that have the same values in specified columns into summary rows

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|     Sales|   NY|     176000|     30000|
|     Sales|   CA|      81000|     23000|
|   Finance|   CA|     189000|     47000|
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
| Marketing|   CA|      80000|     18000|
+----------+-----+-----------+----------+
join ( )
  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
  • Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows.
  • Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame.
  • Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame.
  • Self Join:
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
# Inner join
deptDF.join(empDF, deptDF.dept_id==empDF.emp_dept_id, "inner").show()
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
return: left-hand has, right-hand has too
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
return: left-hand has, but right-hand does not have.
# Self join
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
orderBy( ) and sort( )

orderBy() and sort() can be interchange each other

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
# Sorting different columns in different orders

df.sort("state", "age",ascending=[False,True]).show()
df.sort(df["state"].desc(), df["age"].asc()).show()
df.orderBy("state", "age",ascending=[False,True]).show()
df.orderBy(df["state"].desc(), df["age"].asc()).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+
partitionBy ( )

partitionBy ( )

pivot ( ) & Unpivot ( )

pivot() (Row to Column)

#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
# Output
root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

# Applying pivot()
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

# Output
root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

# one more example
pivotDF = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
root
 |-- Country: string (nullable = true)
 |-- Banana: long (nullable = true)
 |-- Beans: long (nullable = true)
 |-- Carrots: long (nullable = true)
 |-- Orange: long (nullable = true)

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|China  |400   |1500 |1200   |4000  |
|USA    |1000  |1600 |1500   |4000  |
|Mexico |null  |2000 |null   |null  |
|Canada |2000  |null |2000   |null  |
+-------+------+-----+-------+------+

Unpivot 

PySpark SQL doesn’t have unpivot function hence will use the stack() function. 

# Applying unpivot()
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+
sample(), sampleBy()

sample(), is a mechanism to get random sample records from the dataset

Syntax

sample(withReplacement, fraction, seed=None)

  • fraction – Fraction of rows to generate, range [0.0, 1.0]. Note that it doesn’t guarantee to provide the exact number of the fraction of records.
  • seed – Seed for sampling (default a random seed). Used to reproduce the same random sampling.
  • withReplacement – Sample with replacement or not (default False).
df=spark.range(100)
print(df.sample(0.06).collect())

#Output: [Row(id=0), Row(id=2), Row(id=17), Row(id=25), Row(id=26), Row(id=44), Row(id=80)]

Above example, my DataFrame has 100 records and I wanted to get 6% sample records which are 6 but the sample() function returned 7 records. This proves the sample function doesn’t return the exact fraction specified.

To get consistent same random sampling uses the same slice value for every run. Change slice value to get different results.

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,456).collect())
//Output: 19,21,42,48,49,50,75,80

sampleBy()

sampleBy(col, fractions, seed=None)

df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

//Output: [Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]
select ( )
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+
# Select columns by different ways
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

# By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Nested Struct Columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+
# Select child columns
df2.select("name.firstname","name.lastname").show(truncate=False)
+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+
# Select all child columns
df2.select("name.*").show(truncate=False)
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+
show ( )
df.show()
+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+
df.show(truncate=False)
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)
-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
only showing top 3 rows
StructType & StructField

StructType

Defines the structure of the DataFrame. StructType represents a schema, which is a collection of StructField objects. A StructType is essentially a list of fields, each with a name and data type, defining the structure of the DataFrame. It allows for the creation of nested structures and complex data types.

StructField

StructField – Defines the metadata of the DataFrame column. It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

nested StructType

# nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+
transform ( )

transform ( )

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# PySpark transform() Usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount)
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+
UDF

UDF (User Defined Function)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
#create a python function
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 
#Convert a Python function to PySpark UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

convertUDF = udf(lambda z: convertCase(z),StringType())
or 
convertUDF = udf(convertCase,StringType())
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+
union ( ) & unionAll ( )

union ( ) & unionAll ( ) are the same result. unionAll is older, retired


df1
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+
df2
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
df.union(df2).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|<--duplicated
|Maria        |Finance   |CA   |90000 |24 |23000|<--duplicated
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
unionByName ( )

df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2

df1
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+
df2
+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+
# different columns order
df3 = df1.unionByName(df2)
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+
# different columns name and order 
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+
df3=df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+
where() & filter()

where() & filter() can replace each other

  • Use &, |, ~ for logical operations (AND, OR, NOT).
  • Use ==, !=, >, <, >=, <= for comparisons.
  • Always wrap column references in col() for clarity.
  • For SQL-like patterns, consider using functions like like, isin, and between.
  • IS NULL –> “isNull ( )”
  • IS NOT NULL –> “isNotNull ( )”
  • LIKE –> “like ( %abc% )”
  • IN –> “isin (18, 21, 25)”
  • BETWEEN –> “between(18, 25)”
+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+
df.select("gender").filter(df.gender == "M").show()
df.select("gender").where(df.gender == "F").show()
+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     M|
+------+

+------+
|gender|
+------+
|     F|
|     F|
+------+
withColumn()
from pyspark.sql.functions import date_add, col
df.withColumn("dob", date_add("dob", 10)).\
withColumn("newsalary",col("salary")*100).\
drop("middlename").show()
+---------+--------+----------+------+------+---------+
|firstname|lastname|       dob|gender|salary|newsalary|
+---------+--------+----------+------+------+---------+
|    James|   Smith|1991-04-11|     M|   300|    30000|
|  Michael|        |2000-05-29|     M|   400|    40000|
|   Robert|Williams|1978-09-15|     M|   400|    40000|
|    Maria|   Jones|1967-12-11|     F|   400|    40000|
|      Jen|   Brown|1980-02-27|     F|    -1|     -100|
+---------+--------+----------+------+------+---------+
withColumnRenamed ( )

withColumnRenamed() rename a DataFrame column, we often need to rename one column or multiple (or all) columns on PySpark DataFrame

+--------------------+----------+------+------+
|                name|       dob|gender|salary|
+--------------------+----------+------+------+
|    {James, , Smith}|1991-04-01|     M|  3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|    -1|
+--------------------+----------+------+------+
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.show()
+--------------------+-----------+------+-------------+
|                name|DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|    {James, , Smith}| 1991-04-01|     M|         3000|
|   {Michael, Rose, }| 2000-05-19|     M|         4000|
|{Robert, , Williams}| 1978-09-05|     M|         4000|
|{Maria, Anne, Jones}| 1967-12-01|     F|         4000|
|  {Jen, Mary, Brown}| 1980-02-17|     F|           -1|
+--------------------+-----------+------+-------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

deltaTable vs DataFrames

In Databricks and PySpark, DeltaTables and DataFrames both handle structured data but differ in functionality and use cases. Here’s a detailed comparison:

Definitions

DeltaTable

A DeltaTable is a storage format based on Apache Parquet, with support for ACID transactions, versioning, schema enforcement, and advanced file operations. It is managed by the Delta Lake protocol, offering features like time travel, upserts, and deletion.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is an abstraction for structured and semi-structured data in Spark. It is a purely in-memory abstraction and does not directly manage storage or transactions.

Features

FeatureDeltaTableDataFrame
PersistenceStores data on disk in a managed format.Primarily in-memory abstraction (ephemeral).
Schema EnforcementEnforces schema when writing/updating.No schema enforcement unless explicitly specified.
ACID TransactionsSupports atomic writes, updates, and deletes.Not transactional; changes require reprocessing.
VersioningMaintains historical versions (time travel).No versioning; a snapshot of data.
Upserts and DeletesSupports MERGE, UPDATE, and DELETE.Does not directly support these operations.
PerformanceOptimized for storage (Z-order indexing, compaction).Optimized for in-memory transformations.
Time TravelQuery historical data using snapshots.No time travel support.
IndexingSupports indexing (Z-order, data skipping).No indexing capabilities.

Use Cases

DeltaTable

Ideal for persistent storage with advanced capabilities:

  • Data lakes or lakehouses.
  • ACID-compliant operations (e.g., MERGE, DELETE).
  • Time travel to access historical data.
  • Optimizing storage with compaction or Z-ordering.
  • Schema evolution during write operations.

DataFrame

Best for in-memory processing and transformations:

  • Ad-hoc queries and ETL pipelines.
  • Working with data from various sources (files, databases, APIs).
  • Temporary transformations before persisting into Delta or other formats.

Common APIs

DeltaTable

Load Delta table from a path:

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

Merge data:

delta_table.alias("target").merge( 
source_df.alias("source"), 
"target.id = source.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Time Travel:

df = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta/table")

Optimize

OPTIMIZE '/path/to/delta/table' ZORDER BY (column_name);

DataFrame

Read

df = spark.read.format("parquet").load("/path/to/data")

Transformations

transformed_df = df.filter(df.age > 30).groupBy("gender").count()

Write

df.write.format("delta").save("/path/to/save")

Transition Between DeltaTables and DataFrames

Convert DeltaTable to DataFrame:

df = delta_table.toDF()

Write DataFrame to Delta format:

df.write.format("delta").save("/path/to/delta/table")

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Summary of Dataframe Methods

Summary of Dataframe Methods

CategoryMethodExample
InspectionprintSchema()df.printSchema()
columnsdf.columns
Selectionselect()df.select("col1", "col2").show()
withColumn()df.withColumn("new_col", col("col1") + 1)
withColumnRenamed()df.withColumnRenamed("old", "new")
distinct()df.select(“cut”).distinct().show()
take(5) df.take(5) # Retrieve the first 5 rows.
drop( )df.drop(‘col’).show() #drop col
Filteringfilter()
where ( )
df.filter(df.col1 > 10).show()
df . where (df.col1 > 10) . show()
AggregationsgroupBy().agg()df.groupBy("col").agg(sum("val")).show()
count()df.count()
Joinsjoin()df1.join(df2, df1.id == df2.id, "inner")
Left Joindf1.join(df2, df1.id == df2.id, “left_outer”).show()
SortingorderBy()
sort( )
df.orderBy("col1").show()
df.sort(df.col1.desc()).show( )
Null Handlingdropna(), fillna()df.fillna({"col1": 0}).show()
isNotNull( )df.filter(col(‘cut’).isNotNull( )).show( )
isNull( )df.filter(col(‘cut’).isNull( )).show()
dropna()df.dropna(subset=[“col1”]).show()
Date/Timeyear(), month(), dayofmonth()df.withColumn("year", year("date_col"))
Writingwrite.format()df.write.csv(“path/to/csv”, header=True) df.write.json(“path/to/json”)
Save as Tablewrite.format().mode( ).saveAsTable( )df.write.format(“delta”).saveAsTable(“my_table”)
Create as ViewcreateOrReplaceTempView( )df.createOrReplaceTempView(“temp_view_name”)
createOrReplaceGlobalTempView( )df.createOrReplaceGlobalTempView(“global_temp_view_name”)
String Opsupper(), concat()df.withColumn("upper", upper("col"))
PartitioningpartitionBy(“col”)f.write.partitionBy(“department”).parquet(“output/parquet_data”)
repartition(4)df.repartition(4).show() # Repartition into 4 partitions.
coalesce(2)df.coalesce(2).show() # Reduce to 2 partitions.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

arrayType, mapType column and functions

In PySpark, ArrayType and MapType are used to define complex data structures within a DataFrame schema.

ArrayType column, and functions,

ArrayType allows you to store and work with arrays, which can hold multiple values of the same data type.

sample dataframe:
id, numbers|
1, [1, 2, 3]
2, [4, 5, 6]
3, [7, 8, 9]

explode ()

“explode” a given array into individual new rows using the explode function, Offen use it to flatten JSON.

from pyspark.sql.functions import explode

# Explode the 'numbers' array into separate rows
exploded_df = df.withColumn("number", explode(df.numbers))
display(explode_df)
==output==
id	numbers	number
1	[1,2,3]	1
1	[1,2,3]	2
1	[1,2,3]	3
2	[4,5,6]	4
2	[4,5,6]	5
2	[4,5,6]	6
3	[7,8,9]	7
3	[7,8,9]	8
3	[7,8,9]	9
split ()

Split strings based on a specified delimiter, return a array type.

from pyspark.sql.functions import split
df.withColumn(“Name_Split”, split(df[“Name”], “,”))

sample dataframe
+————–+
| Name |
+————–+
| John,Doe |
| Jane,Smith |
| Alice,Cooper |
+————–+

from pyspark.sql.functions import split
# Split the 'Name' column by comma
df_split = df.withColumn("Name_Split", split(df["Name"], ","))

==output==
+-------------+----------------+
| Name        | Name_Split     |
+-------------+----------------+
| John,Doe    | [John, Doe]    |
| Jane,Smith  | [Jane, Smith]  |
| Alice,Cooper| [Alice, Cooper]|
+-------------+----------------+
array ()

Creates an array column.

from pyspark.sql.functions import array, col
data=[(1,2,3),(4,5,6)]
schema=['num1','num2','num3']
df1=spark.createDataFrame(data,schema)
df1.show()
# create a new column - numbers, array type. elements use num1,num2,num3   
df1.withColumn("numbers",array(col("num1"),col("num2"),col("num3"))).show()

==output==
+----+----+----+
|num1|num2|num3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

#new array column "numbers" created
+----+----+----+-----------+
|num1|num2|num3| numbers   |
+----+----+----+-----------+
|   1|   2|   3| [1, 2, 3] |
|   4|   5|   6| [4, 5, 6] |
+----+----+----+-----------+
array_contains ()

Checks if an array contains a specific element.

from pyspark.sql.functions import array_contains
array_contains(array, value)

sample dataframe
+—+———————–+
|id |fruits |
+—+———————–+
|1 |[apple, banana, cherry]|
|2 |[orange, apple, grape] |
|3 |[pear, peach, plum] |
+—+———————–+

from pyspark.sql.functions import array_contains

# Using array_contains to check if the array contains 'apple'
df.select("id", array_contains("fruits", "apple").alias("has_apple")).show()

==output==
+---+----------+
| id|has_apple |
+---+----------+
|  1|      true|
|  2|      true|
|  3|     false|
+---+----------+
getItem()

Access individual elements of an array by their index using the getItem() method

# Select the second element (index start from 0) of the 'numbers' array
df1 = df.withColumn("item_1_value",   df.numbers.getItem(1))
display(df1)
==output==
id	numbers	      item_1_value
1	[1,2,3]	       2
2	[4,5,6]	       5
3	[7,8,9]	       8
size ()

Returns the size of the array.

from pyspark.sql.functions import size

# Get the size of the 'numbers' array
df.select(size(df.numbers)).show()

==output==
+-------------+
|size(numbers)|
+-------------+
|            3|
|            3|
|            3|
+-------------+
sort_array()

Sorts the array elements.

sort_array(col: ‘ColumnOrName’, asc: bool = True)

If `asc` is True (default) then ascending and if False then descending. if asc=True, can be omitted.

from pyspark.sql.functions import sort_array
df.withColumn("numbers", sort_array("numbers")).show()
==output==
ascending 
+---+---------+
| id|  numbers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[4, 5, 6]|
|  3|[7, 8, 9]|
+---+---------+
df.select(sort_array("numbers", asc=False).alias("sorted_desc")).show()
==output==
descending 
+-----------+
|sorted_desc|
+-----------+
|  [3, 2, 1]|
|  [6, 5, 4]|
|  [9, 8, 7]|
+-----------+
concat ()

concat() is used to concatenate arrays (or strings) into a single array (or string). When dealing with ArrayType, concat() is typically used to combine two or more arrays into one.

from pyspark.sql.functions import concat
concat(*cols)

sample DataFrames
+—+——+——+
|id |array1|array2|
+—+——+——+
|1 | [a, b] | [x, y]|
|2 | [c] | [z] |
|3 | [d, e] | null |
+—+——-+——+

from pyspark.sql.functions import concat

# Concatenating array columns
df_concat = df.withColumn("concatenated_array", concat(col("array1"), col("array2")))
df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |null              |
+---+------+------+------------------+

Handling null Values

If any of the input columns are null, the entire result can become null. This is why you’re seeing null instead of just the non-null array.

To handle this, you can use coalesce() to substitute null with an empty array before performing the concat(). coalesce() returns the first non-null argument. Here’s how you can modify your code:

from pyspark.sql.functions import concat, coalesce, lit

# Define an empty array for the same type
empty_array = array()

# Concatenate with null handling using coalesce
df_concat = df.withColumn(
    "concatenated_array",
    concat(coalesce(col("array1"), empty_array), coalesce(col("array2"), empty_array))
)

df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |[d, e]            |
+---+------+------+------------------+
array_zip ()

Combines arrays into a single array of structs.


☰ MapType column, and functions

MapType is used to represent map key-value pair similar to python Dictionary (Dic)

from pyspark.sql.types import MapType, StringType, IntegerType
# Define a MapType
my_map = MapType(StringType(), IntegerType(), valueContainsNull=True)

Parameters:

  • keyType: Data type of the keys in the map. You can use PySpark data types like StringType(), IntegerType(), DoubleType(), etc.
  • valueType: Data type of the values in the map. It can be any valid PySpark data type
  • valueContainsNull: Boolean flag (optional). It indicates whether null values are allowed in the map. Default is True.

sample dataset
# Sample dataset (Product ID and prices in various currencies)
data = [
(1, {“USD”: 100, “EUR”: 85, “GBP”: 75}),
(2, {“USD”: 150, “EUR”: 130, “GBP”: 110}),
(3, {“USD”: 200, “EUR”: 170, “GBP”: 150}),
]


sample dataframe
+———-+————————————+
|product_id|prices |
+———-+————————————+
|1 |{EUR -> 85, GBP -> 75, USD -> 100} |
|2 |{EUR -> 130, GBP -> 110, USD -> 150}|
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+———-+————————————+

Accessing map_keys (), map_values ()

Extract keys (currency codes) and values (prices):

from pyspark.sql.functions import col, map_keys, map_values
# Extract map keys and values
df.select(
    col("product_id"),
    map_keys(col("prices")).alias("currencies"),
    map_values(col("prices")).alias("prices_in_currencies")
).show(truncate=False)

==output==
+----------+---------------+--------------------+
|product_id|currencies     |prices_in_currencies|
+----------+---------------+--------------------+
|1         |[EUR, GBP, USD]|[85, 75, 100]       |
|2         |[EUR, GBP, USD]|[130, 110, 150]     |
|3         |[EUR, GBP, USD]|[170, 150, 200]     |
+----------+---------------+--------------------+
exploder ()

Use explode () to flatten the map into multiple rows, where each key-value pair from the map becomes a separate row.

from pyspark.sql.functions import explode
# Use explode to flatten the map
df_exploded = df.select("product_id", explode("prices").alias("currency", "price")).show()

==output==
+----------+--------+-----+
|product_id|currency|price|
+----------+--------+-----+
|         1|     EUR|   85|
|         1|     GBP|   75|
|         1|     USD|  100|
|         2|     EUR|  130|
|         2|     GBP|  110|
|         2|     USD|  150|
|         3|     EUR|  170|
|         3|     GBP|  150|
|         3|     USD|  200|
+----------+--------+-----+
Accessing specific elements in the map

To get the price for a specific currency (e.g., USD) for each product:

from pyspark.sql.functions import col, map_keys, map_values
# Access the value for a specific key in the map 
df.select(
    col("product_id"),
    col("prices").getItem("USD").alias("price_in_usd")
).show(truncate=False)

==output==
+----------+------------+
|product_id|price_in_usd|
+----------+------------+
|1         |100         |
|2         |150         |
|3         |200         |
+----------+------------+
filtering

filter the rows based on conditions involving the map values

from pyspark.sql.functions import col, map_keys, map_values
# Filter rows where price in USD is greater than 150
df.filter(col("prices").getItem("USD") > 150).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
map_concat ()

Combines two or more map columns by merging their key-value pairs.

from pyspark.sql.functions import map_concat, create_map, lit

# Define the additional currency as a new map using create_map()
additional_currency = create_map(lit("CAD"), lit(120))

# Add a new currency (e.g., CAD) with a fixed price to all rows
df.withColumn(
    "updated_prices",
    map_concat(col("prices"), additional_currency)
).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

withColumn, select

withColumn()

It’s a “transformation”.
withColumn() add or replace a column_name in a DataFrame. In other words, if “column_name” exists, replace/change the existing column, otherwise, add “column_name” as new column.

Syntax:

from pyspark.sql.functions import col, lit, concat, when, upper, coalesce
df.withColumn(“column_name”, expression)

Key Parameters

  • "column_name": The name of the new or existing column.
  • expression: Any transformation, calculation, or function applied to create or modify the column.
Basic Column Creation (with literal values)

from pyspark.sql.functions import lit
# Add a column with a constant value
df_new = df.withColumn(“New_Column”, lit(100))

===output===
ID Name Grade New_Column
1 Alice null 100
2 Bob B 100
3 Charlie C 100

# Add a new column, no value
df_new = df.withColumn(“New_Column”, lit(None))

===output===
ID Name Grade New_Column
1 Alice null null
2 Bob B null
3 Charlie C null

Arithmetic Operation

from pyspark.sql.functions import col
# Create a new column based on arithmetic operations
df_arithmetic = df.withColumn(“New_ID”, col(“ID”) * 2 + 5)
df_arithmetic.show()
===output===
ID Name Grade New_ID
1 Alice null 7
2 Bob B 9
3 Charlie C 11

Using SQL Function

you can use functions like concat(), substring(), when(), length(), etc.

from pyspark.sql.functions import concat, lit
# Concatenate two columns with a separator
df_concat = df.withColumn(“Full_Description”, concat(col(“Name”), lit(” has ID “), col(“ID”)))

Conditional Logic

Using when() and otherwise() is equivalent to SQL’s CASE WHEN expression.

from pyspark.sql.functions import when

# Add a new column with conditional logic
df_conditional = df.withColumn("Is_Adult", when(col("ID") > 18, "Yes").otherwise("No"))
df_conditional.show()
String Function

You can apply string functions like upper(), lower(), or substring()

from pyspark.sql.functions import upper
# Convert a column to uppercase
df_uppercase = df.withColumn(“Uppercase_Name”, upper(col(“Name”)))
df_uppercase.show()

Type Casting

You can cast a column to a different data type.

Cast the ID column to a string

# Cast the ID column to a string
df_cast = df.withColumn(“ID_as_string”, col(“ID”).cast(“string”))
df_cast.show()

Handling Null Values

create columns that handle null values using coalesce() or fill()

Coalesce:

This function returns the first non-null value among its arguments.

from pyspark.sql.functions import coalesce
# Return non-null value between two columns
df_coalesce = df.withColumn(“NonNullValue”, coalesce(col(“Name”), lit(“Unknown”)))
df_coalesce.show()

Fill Missing Values:

# Replace nulls in a column with a default value
df_fill = df.na.fill({“Name”: “Unknown”})
df_fill.show()

Creating Columns with Complex Expressions

create columns based on more complex expressions or multiple transformations at once.

# Create a column with multiple transformations
df_complex = df.withColumn( “Complex_Column”, concat(upper(col(“Name”)), lit(“_”), col(“ID”).cast(“string”)) )
df_complex.show(truncate=False)

example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, when, upper, coalesce

# Initialize Spark session
spark = SparkSession.builder.appName("withColumnExample").getOrCreate()

# Create a sample DataFrame
data = [(1, "Alice", None), (2, "Bob", "B"), (3, "Charlie", "C")]
df = spark.createDataFrame(data, ["ID", "Name", "Grade"])

# Perform various transformations using withColumn()
df_transformed = df.withColumn("ID_Multiplied", col("ID") * 10) \
                   .withColumn("Full_Description", concat(upper(col("Name")), lit(" - ID: "), col("ID"))) \
                   .withColumn("Pass_Status", when(col("Grade") == "C", "Pass").otherwise("Fail")) \
                   .withColumn("Non_Null_Grade", coalesce(col("Grade"), lit("N/A"))) \
                   .withColumn("ID_as_String", col("ID").cast("string"))

# Show the result
df_transformed.show(truncate=False)
==output==
ID	Name	Grade	ID_Multiplied	Full_Description	Pass_Status	Non_Null_Grade	ID_as_String
1	Alice	null	10	ALICE - ID: 1	Fail	N/A	1
2	Bob	B	20	BOB - ID: 2	Fail	B	2
3	Charlie	C	30	CHARLIE - ID: 3	Pass	C	3

select ()

select () is used to project (select) a set of columns or expressions from a DataFrame. This function allows you to choose and work with specific columns, create new columns, or apply transformations to the data.

Syntax

DataFrame.select(*cols)

Commonly Used PySpark Functions with select()

  • col(column_name): Refers to a column.
  • alias(new_name): Assigns a new name to a column.
  • lit(value): Adds a literal value.
  • round(column, decimals): Rounds off the values in a column.
  • concat(col1, col2, ...): Concatenates multiple columns.
  • when(condition, value): Adds conditional logic.
Renaming Columns:
df.select(df["column1"].alias("new_column1"), df["column2"]).show()

Using Expressions:
from pyspark.sql import functions as F
df.select(F.col("column1"), F.lit("constant_value"), (F.col("column2") + 10).alias("modified_column2")).show()

Performing Calculations
df.select((df["column1"] * 2).alias("double_column1"), F.round(df["column2"], 2).alias("rounded_column2")).show()

Handling Complex Data Types (Struct, Array, Map):
df.select("struct_column.field_name").show()

Selecting with Wildcards:

While PySpark doesn’t support SQL-like wildcards directly, you can achieve similar functionality using selectExpr (discussed below) or other methods like looping over df.columns.

df.select([c for c in df.columns if “some_pattern” in c]).show()

Using selectExpr ()

df.selectExpr(“column1”, “column2 + 10 as new_column2”).show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Pyspark: read and write a csv file

In PySpark, we can read from and write to CSV files using DataFrameReader and DataFrameWriter with the csv method. Here’s a guide on how to work with CSV files in PySpark:

Reading CSV Files in PySpark

Syntax

df = spark.read.format(“csv”).options(options).load(ffile_location).schema(schema_df)

format
  • csv
  • Parquet
  • ORC
  • JSON
  • AVRO
option
  • header = “True”; “False”
  • inferSchema = “True”; ”False”
  • sep=”,” … whatever
file_location
  • load(path1)
  • load(path1,path2……)
  • load(folder)
Schema
  • define a schema
  • Schema
  • my_schema

define a schema


from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
    StructField("column1", IntegerType(), True),   # Column1 is Integer, nullable
    StructField("column2", StringType(), True),    # Column2 is String, nullable
    StructField("column3", StringType(), False)    # Column3 is String, non-nullable
])

#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"

Example

Read CSV file with header, infer schema, and specify null value


# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
    .option("header", "true") \    # Use the first row as the header
    .option("inferSchema", "true")\ # Automatically infer the schema
    .option("sep", ",") \           # Specify the delimiter
    .load("path/to/input_file.csv")\ # Load the file
    .option("nullValue", "NULL" # Define a string representation of null


# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\   
.option("nullValue", "NULL")


# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("/path_folder/)\   
.option("nullValue", "NULL")

When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.

Writing CSV Files in PySpark

Syntax


df.write.format("csv").options(options).save("path/to/output_directory")

Example


# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("path/to/output_directory")



# Write DataFrame to a CSV file with header, partitioning, and compression

df.write.format("csv") \
  .option("header", "true") \         # Write the header
  .option("compression", "gzip") \    # Use gzip compression
  .partitionBy("year", "month") \ # Partition the output by specified columns
  .mode("overwrite") \                # Overwrite existing data
  .save("path/to/output_directory")   # Specify output directory

By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

we need take additional step – “Rename the File to name1.csv

# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")

# Filter for the part file
for file in files:
    if file.name.startswith("part-"):
        source_file = file.path  # Full path to the part file
        destination_file = "dbfs:/FileStore/name1.csv"
        
        # Move and rename the file
        dbutils.fs.mv(source_file, destination_file)
        break


display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name

PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

Steps:

  1. Use coalesce(1) to combine all data into a single partition.
  2. Save the file to a temporary location.
  3. Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/temp_folder")

# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
    if file.name.startswith("part-"):
        part_file = file.path
        break

# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")

# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)

Please do not hesitate to contact me if you have any questions at William . chen @mainri.ca 

(remove all space from the email account 😊)

spark: RDD, Dataframe, Dataset, Transformation and Action

In Apache Spark, RDD, DataFrame, and Dataset are the core data structures used to handle distributed data. Each of these offers different levels of abstraction and optimization.

Basic Overview

FeatureRDDDataFrameDataset
DefinitionLow-level abstraction for distributed data (objects).
RDDs (Resilient Distributed Datasets) are the fundamental data structure in Spark, representing an immutable distributed collection of objects. They provide a low-level interface to Spark, allowing developers to perform operations directly on data in a functional programming style.
High-level abstraction for structured data (table).
DataFrames are similar to tables in relational databases and are used to represent structured data. They allow you to perform complex queries and operations on structured datasets with a SQL-like syntax.
Combines RDD and DataFrame, adds type safety (Scala/Java).
Datasets are an extension of DataFrames that provide the benefits of both RDDs and DataFrames, with compile-time type safety. They are particularly useful in Scala and Java for ensuring data types during compile time.
APIFunctional (map, filter, etc.)
The RDD API provides functional programming constructs, allowing transformations and actions through functions like map, filter, and reduce.
SQL-like API + relational functions.
The DataFrame API includes both SQL-like queries and relational functions, making it more user-friendly and easier to work with structured data.
Typed API + relational functions (strong typing in Scala/Java).
The Dataset API combines typed operations with the ability to perform relational functions, allowing for a more expressive and type-safe programming model in Scala/Java.
Data StructureCollection of elements (e.g., objects, arrays).
RDDs are essentially collections of objects, which can be of any type (primitive or complex). This means that users can work with various data types without a predefined schema.
Distributed table with named columns.
DataFrames represent structured data as a distributed table, where each column has a name and a type. This structured format makes it easier to work with large datasets and perform operations.
Typed distributed table with named columns.
Datasets also represent data in a structured format, but they enforce types at compile time, enhancing safety and performance, especially in statically typed languages like Scala and Java.
SchemaNo schemaDefined schema (column names)Schema with compile-time type checking (Scala/Java)
PerformanceLess optimized (no Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)

Transformations and Actions

Transformations and Actions are two fundamental operations used to manipulate distributed data collections like RDDs, DataFrames, and Datasets.

actions” are operations that return the raw values, In other words, any RDD function that returns other than RDD[T] is considered as an action in spark programming.

transformations” are lazy meaning they do not get executed right away and action functions trigger to execute the transformations.

High-Level Differences

  • Transformations: These are lazy operations that define a new RDD, DataFrame, or Dataset by applying a function to an existing one. However, they do not immediately execute—Spark builds a DAG (Directed Acyclic Graph) of all transformations.
  • Actions: These are eager operations that trigger execution by forcing Spark to compute and return results or perform output operations. Actions break the laziness and execute the transformations.

Key Differences Between Transformations and Actions

FeatureTransformationsActions
DefinitionOperations that define a new dataset based on an existing one, but do not immediately execute.Operations that trigger the execution of transformations and return results or perform output.
Lazy EvaluationYes, transformations are lazily evaluated and only executed when an action is called.No, actions are eager and immediately compute the result by triggering the entire execution plan.
Execution TriggerDo not trigger computation immediately. Spark builds a DAG of transformations to optimize execution.Trigger the execution of the transformations and cause Spark to run jobs and return/output data.
Return TypeReturn a new RDD, DataFrame, or Dataset (these are still “recipes” and not materialized).Return a result to the driver program (like a value) or write data to an external storage system.
Example Operationsmap, filter, flatMap, join, groupBy, select, agg, orderBy.count, collect, first, take, reduce, saveAsTextFile, foreach.

Conclusion

  • Transformations are used to define what to do with the data but don’t execute until an action triggers them.
  • Actions are used to retrieve results or perform output, forcing Spark to execute the transformations.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Partition in databricks

In Databricks, partitioning is a strategy used to organize and store large datasets into smaller, more manageable chunks based on specific column values. Partitioning can improve query performance and resource management when working with large datasets in Spark, especially in distributed environments like Databricks.

Key Concepts of Partitioning in Databricks

Partitioning in Tables:

When saving a DataFrame as a table or Parquet file in Databricks, you can specify partitioning columns to divide the data into separate directories. Each partition contains a subset of the data based on the values of the partitioning column(s).

Partitioning in DataFrames

Spark partitions data in-memory across nodes in the cluster to parallelize processing. Partitioning helps distribute the workload evenly across the cluster.

Types of Partitioning

Static Partitioning (Manual Partitioning)

When saving or writing data to a file or table, you can manually specify one or more columns to partition the data by. This helps when querying large tables, as Spark can scan only the relevant partitions instead of the entire dataset.

Dynamic Partitioning (Automatic Partitioning)

Spark automatically partitions a DataFrame based on the size of the data and available resources. The number of partitions is determined by Spark’s internal algorithm based on the data’s size and complexity.

Let’s say, there is dataframe

Partitioning in Databricks File System (DBFS)

When writing data to files in Databricks (e.g., Parquet, Delta), you can specify partitioning columns to optimize reads and queries. For example, when you partition by a column, Databricks will store the data in different folders based on that column’s values.


# Example of saving a DataFrame with partitioning
df.write.partitionBy("year", "month").parquet("/mnt/data/name_partitioned")

In this example, the data will be saved in a directory structure like:

/mnt/data/name_partitioned/gender=F
/mnt/data/name_partitioned/gender=M

Partitioning in Delta Tables

In Delta Lake (which is a storage layer on top of Databricks), partitioning is also a best practice to optimize data management and queries. When you define a Delta table, you can specify partitions to enable efficient query pruning, which results in faster reads and reduced I/O.


# Writing a Delta table with partitioning
df.write.format("delta").partitionBy("gender", "age").save("/mnt/delta/partitioned_data")

In this example, the data will be saved in a directory structure like:

/mnt/delta/partitioned_data/gender=F/age=34
/mnt/delta/partitioned_data/gender=F/age=45
/mnt/delta/partitioned_data/gender=M/age=23
/mnt/delta/partitioned_data/gender=M/age=26
/mnt/delta/partitioned_data/gender=M/age=32
/mnt/delta/partitioned_data/gender=M/age=43

Optimizing Spark DataFrame Partitioning

When working with in-memory Spark DataFrames in Databricks, you can manually control the number of partitions to optimize performance.

Repartition

This increases or decreases the number of partitions.
This operation reshuffles the data, redistributing it into a new number of partitions.


df = df.repartition(10)  # repartition into 10 partitions

Coalesce

This reduces the number of partitions without triggering a shuffle operation (which is often more efficient than repartition).
This is a more efficient way to reduce the number of partitions without triggering a shuffle.


df = df.coalesce(5) # reduce partitions to 5

When to Use Partitioning

  • Partitioning works best when you frequently query the data using the columns you’re partitioning by. For example, partitioning by date (e.g., year, month, day) is a common use case when working with time-series data.
  • Don’t over-partition: Too many partitions can lead to small file sizes, which increases the overhead of managing the partitions.

Key Notes

  • Partitioning Cannot Change: If partitioning changes are needed, you must recreate the table.

Summary

  • Partitioning divides data into smaller, more manageable chunks.
  • It improves query performance by allowing Spark to read only relevant data.
  • You can control partitioning when saving DataFrames or Delta tables to optimize storage and query performance.
  • Use repartition() or coalesce() to manage in-memory partitions for better parallelization.
  • Use coalesce() to reduce partitions without shuffling.
  • Use repartition() when you need to rebalance data.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)