Building Slowly Changing Dimensions Type 2 in Azure Data Factory and Synapse

Within the context of enterprise data warehousing, the effective management of historical data is essential for supporting informed business decision-making. Slowly Changing Dimension (SCD) Type 2 is a widely adopted technique for addressing changes in data over time.

A brief overview of Slowly Changing Dimensions Type 2

Slowly Changing Dimensions Type 2 (SCD Type 2) is a common solution for managing historical data. To ensure clarity, I’ll briefly recap SCD Type 2.

A Type 2 of SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record.

Existing Dimension data
surrokey	depID	dep	IsActivity
1	        1001	IT	1
2	        1002	HR	1
3	        1003	Sales	1
Dimension changed and new data comes 
depId dep
1003  wholesale   <--- depID is same, name changed from "Sales" to "wholesale"
1004  Finance     <--- new data

Mark existing dimensional records as expired (inactive); create a new record for the current dimensional data; and insert new incoming data as new dimensional records.

Now, the new Dimension will be:
surrokey  depID	dep	   IsActivity
1	  1001	IT	   1   <-- No action required
2	  1002	HR	   1   <-- No action required
3	  1003	Sales	   0   <-- mark as inactive
4         1003  wholesale  1   <-- add updated active value
5         1004  Finance    1   <-- insert new data

This solution demonstrates the core concepts of a Slowly Changing Dimension (SCD) Type 2 implementation. While it covers the major steps involved, real-world production environments often have more complex requirements. When designing dimension tables (e.g., the dep table), I strongly recommend adding more descriptive columns to enhance clarity. Specifically, including [Start_active_date] and [End_active_date] columns significantly improves the traceability and understanding of dimension changes over time.

Implementing SCD Type 2

Step 1: Create a Dimension Table- dep

# Create table
create table dep (
surrokey int IDENTITY(1, 1), 
depID int, 
dep varchar(50), 
IsActivity bit);

# Insert data, 
surrokey	depID	dep	IsActivity
1	        1001	IT	1
2	        1002	HR	1
3	        1003	Sales	1

Step 2: Create Data Flow

Add the source dataset. dataset should point to file which is located in your source layer.

We have 2 data rows. That means depID =1003, updated value, a new comes depID=1004 need add into dimension table.

Step 3: Add derived column

Add derived column resource and add column name as isactive and provide the value as 1.

Step 4: Sink dimension data

Create a dataset point to SQL Server Database Table dep

Add a Sink use above dataset, SQLServer_dep_table

Configure the sink mappings as shown below

Step 5: Add SQL dataset as another source.

Step 6: Rename column from Database Table dep

Use select resource to rename columns from SQL table.

rename column name:

  • depID –> sql_depID
  • dep –> sql_dep
  • Isactivity –> sql_IsActivity

Step 7: Lookup

Add lookup to join new dimension data that we have import in “srcDep” at “Step 2”

At this step, existing dimension table “Left Join” out the new coming dimension (need update info or new comes dimension values).

  • existing dimension data, depID=1003 ,previously “dep” called “Sales” , now it need changing to “wholesales”

Step 8: filter out non-nulls

Add filter, filter out the rows which has non-nulls in the source file columns.

Filter expression : depID column is not null. 
!isNull(depid)

This requires filtering the ‘lkpNeedUpdate’ lookup output to include only rows where the depID is not null.

Step 9: Select need columns

Since up stream “filterNonNull” output more columns,

Not all columns are required. The objective is to use the new data (containing depid and dep) to update existing information in the dimension table (specifically sql_depID, sql_dep, and sql_isActivity) and mark the old information as inactive.

Add a “SELECT” to select need the columns that we are going to insert or update in Database dimension table.

Step 10: add a new column and give its value = “0”

Add a deriver, set its value is “0” , means mark it as “inactive

Step 11: alter row

Add a “Alter Row” to update row information.

configure alter conditions:

Update     1==1 

Step 12 Sink updated information

we have updated the existing rows, mark it “0” as “inactive”. it time to save it into database dimension table.

Add a “Sink” point to database dimension table – dep

mapping the columns,

sql_depid  ---> depID
sql_dep  ---> dep
ActivityStatus  ---> IsActivity

Step 13: Adjust Sink order

As there are two sinks, one designated for the source data and the other for the updated data, a specific processing order must be enforced.

Click on a blank area of the canvas, at “Settings” tag, configure them order.
1: sinkUpdated
2: sinkToSQLDBdepTable

Step 14: creata a pipeline

create a pipeline, add this data flow, run it.

SELECT TOP (5000) [surrokey]
      ,[depID]
      ,[dep]
      ,[IsActivity]
  FROM [williamSQLDB].[dbo].[dep]

surrokey  depID	  dep	       IsActivity
1	  1001	  IT	        1
2	  1002	  HR	        1
3	  1003    Sales	        0
4	  1003    Wholesale	1
5	  1004	  Finance	1

Conclusion

In conclusion, we have explored the powerful combination of Slowly Changing Dimensions Type 2, it has provided you with a comprehensive understanding of how to effectively implement SCD Type 2 in your data warehousing projects, leveraging modern technologies and following industry best practices.

By implementing SCD Type 2 according to Ralph Kimballโ€™s approach, organizations can achieve a comprehensive view of dimensional data, enabling accurate trend analysis, comparison of historical performance, and tracking of changes over time. It empowers businesses to make data-driven decisions based on a complete understanding of the dataโ€™s evolution, ensuring data integrity and reliability within the data warehousing environment.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

Summary of Commonly used T-SQL queries

SQL Execution Order:

  1. FROM โ€“ Specifies the tables involved in the query.
  2. JOIN โ€“ Joins multiple tables based on conditions.
  3. WHERE โ€“ Filters records before aggregation.
  4. GROUP BY โ€“ Groups records based on specified columns.
  5. HAVING โ€“ Filters aggregated results.
  6. SELECT โ€“ Specifies the columns to return.
  7. DISTINCT โ€“ Removes duplicate rows.
  8. ORDER BY โ€“ Sorts the final result set.
  9. LIMIT / OFFSET โ€“ Limits the number of rows returned.

TSQL Commands Categorized 

Categorized 
  • DDL โ€“ Data Definition Language
    CREATE, DROP, ALTER, TRUNCATE, COMMENT, RENAME
  • DQL โ€“ Data Query Language
    SELECT
  • DML โ€“ Data Manipulation Language
    INSERT, UPDATE, DELETE, LOCK
  • DCL โ€“ Data Control Language
    GRANT, REVOKE
  • TCL โ€“ Transaction Control Language
    BEGIN TRANSACTION, COMMIT, SAVEPOINT, ROLLBACK, SET TRANSACTION, SET CONSTRAINT
DDL command
ALTER
Alter Table
Add Column
ALTER TABLE table_name
ADD column_name data_type [constraints];
ALTER TABLE Employees
ADD Age INT NOT NULL;


Drop Column
ALTER TABLE table_name
DROP COLUMN column_name;
ALTER TABLE Employees
DROP COLUMN Age;


Modify Column (Change Data Type or Nullability)
ALTER TABLE table_name
ALTER COLUMN column_name new_data_type [NULL | NOT NULL];
ALTER TABLE Employees
ALTER COLUMN Age BIGINT NULL;


Add Constraint:
ALTER TABLE table_name ADD CONSTRAINT constraint_name constraint_type (column_name);
ALTER TABLE Employees
ADD CONSTRAINT PK_Employees PRIMARY KEY (EmployeeID);


Drop Constraint:
ALTER TABLE table_name DROP CONSTRAINT constraint_name;
ALTER TABLE Employees DROP CONSTRAINT PK_Employees;
Alter Database
Change Database Settings
ALTER DATABASE database_name
SET option_name;
ALTER DATABASE TestDB
SET READ_ONLY;


Change Collation
ALTER DATABASE database_name
COLLATE collation_name;
ALTER DATABASE TestDB
COLLATE SQL_Latin1_General_CP1_CI_AS;

ALTER VIEW
Modify an Existing View
ALTER VIEW view_name
AS
SELECT columns
FROM table_name
WHERE condition;
ALTER VIEW EmployeeView
AS
SELECT EmployeeID, Name, Department
FROM Employees
WHERE IsActive = 1;

ALTER PROCEDURE
Modify an Existing Stored Procedure
ALTER PROCEDURE procedure_name
AS
BEGIN
    -- Procedure logic
END;
ALTER PROCEDURE GetEmployeeDetails
@EmployeeID INT
AS
BEGIN
    SELECT * FROM Employees WHERE EmployeeID = @EmployeeID;
END;

ALTER FUNCTION
ALTER FUNCTION function_name
RETURNS data_type
AS
BEGIN
    -- Function logic
    RETURN value;
END;
ALTER FUNCTION GetFullName
(@FirstName NVARCHAR(50), @LastName NVARCHAR(50))
RETURNS NVARCHAR(100)
AS
BEGIN
    RETURN @FirstName + ' ' + @LastName;
END;
ALTER INDEX
Rebuild Index:
ALTER INDEX index_name
ON table_name
REBUILD;


Reorganize Index:
ALTER INDEX index_name
ON table_name
REORGANIZE;


Disable Index:
ALTER INDEX index_name
ON table_name
DISABLE;
ALTER SCHEMA
Move Object to a Different Schema
ALTER SCHEMA new_schema_name
TRANSFER current_schema_name.object_name;
ALTER SCHEMA Sales
TRANSFER dbo.Customers;

ALTER ROLE
Add Member:
ALTER ROLE role_name
ADD MEMBER user_name;
ALTER ROLE db_datareader
ADD MEMBER User1;

Drop Member:
ALTER ROLE role_name
DROP MEMBER user_name;
CREATE
Create Table
# Create Parent Table
CREATE TABLE Departments (
    DeptID INT IDENTITY(1, 1) PRIMARY KEY, -- IDENTITY column as the primary key
    DeptName NVARCHAR(100) NOT NULL
);

# Create child table with FK
CREATE TABLE Employees (
    EmpID INT IDENTITY(1, 1) PRIMARY KEY,  -- IDENTITY column as the primary key
    EmpName NVARCHAR(100) NOT NULL,
    Position NVARCHAR(50),
    DeptID INT NOT NULL,                   -- Foreign key column
    CONSTRAINT FK_Employees_Departments FOREIGN KEY (DeptID) REFERENCES Departments(DeptID)
);

DROP
Drop Database

Make sure no active connections are using the database.
To forcibly close connections, use:

ALTER DATABASE TestDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE; DROP DATABASE TestDB;
DROP DATABASE DatabaseName;
DROP DATABASE TestDB;
Drop Schema

The DROP SCHEMA statement removes a schema, but it can only be dropped if no objects exist within it.

Before dropping a schema, make sure to drop or move all objects inside it

DROP TABLE SalesSchema.SalesTable;
DROP SCHEMA SalesSchema;

DROP SCHEMA SalesSchema;
Drop Table

Dropping a table will also remove constraints, indexes, and triggers associated with the table.

DROP TABLE TableName;
DROP TABLE Employees;
Drop Column

The DROP COLUMN statement removes a column from a table.

ALTER TABLE TableName DROP COLUMN ColumnName;
ALTER TABLE Employees DROP COLUMN Position;

You cannot drop a column that is part of a PRIMARY KEY, FOREIGN KEY, or any other constraint unless you first drop the constraint.

Additional DROP Statements

Drop Index

Remove an index from a table.
DROP INDEX IndexName ON TableName;

Drop Constraint

Remove a specific constraint (e.g., PRIMARY KEY, FOREIGN KEY).
ALTER TABLE TableName DROP CONSTRAINT ConstraintName;

Order of Dependencies

When dropping related objects, always drop dependent objects first:

  1. Constraints (if applicable)
  2. Columns (if applicable)
  3. Tables
  4. Schemas
  5. Database

Example: Full Cleanup

-- Drop a column
ALTER TABLE Employees DROP COLUMN TempColumn;

-- Drop a table
DROP TABLE Employees;

-- Drop a schema
DROP SCHEMA HR;

-- Drop a database
ALTER DATABASE CompanyDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
DROP DATABASE CompanyDB;
DQL command
CTE

Common Table Expression (CTE) in SQL is a temporary result set that you can reference within a SELECTINSERTUPDATE, or DELETE statement. CTEs are particularly useful for simplifying complex queries, improving readability, and breaking down queries into manageable parts. 

WITH cte_name (optional_column_list) AS (
    -- Your query here
    SELECT column1, column2
    FROM table_name
    WHERE condition
)
-- Main query using the CTE
SELECT *
FROM cte_name;

Breaking Down Multi-Step Logic

WITH Step1 AS (
    SELECT ProductID, SUM(Sales) AS TotalSales
    FROM Sales
    GROUP BY ProductID
),
Step2 AS (
    SELECT ProductID, TotalSales
    FROM Step1
    WHERE TotalSales > 1000
)
SELECT *
FROM Step2;

Intermediate Calculations

WITH AverageSales AS (
    SELECT ProductID, AVG(Sales) AS AvgSales
    FROM Sales
    GROUP BY ProductID
)
SELECT ProductID, AvgSales
FROM AverageSales
WHERE AvgSales > 500;

Recursive CTE

WITH RECURSIVE EmployeeHierarchy AS (
    SELECT EmployeeID, ManagerID, EmployeeName
    FROM Employees
    WHERE ManagerID IS NULL -- Starting point (CEO)

    UNION ALL

    SELECT e.EmployeeID, e.ManagerID, e.EmployeeName
    FROM Employees e
    JOIN EmployeeHierarchy eh ON e.ManagerID = eh.EmployeeID
)
SELECT *
FROM EmployeeHierarchy;

Why Use CTEs?

  1. Improve Readability:
    • CTEs allow you to break down complex queries into smaller, more understandable parts.
    • They make the query logic clearer by separating it into named, logical blocks.
  2. Reusability:
    • You can reference a CTE multiple times within the same query, avoiding the need to repeat subqueries.
  3. Recursive Queries:
    • CTEs support recursion, which is useful for hierarchical or tree-structured data (e.g., organizational charts, folder structures).
  4. Simplify Debugging:
    • Since CTEs are modular, you can test and debug individual parts of the query independently.
  5. Alternative to Subqueries:
    • CTEs are often easier to read and maintain compared to nested subqueries.

Conditional Statements

IFโ€ฆELSE …
IF EXISTS (SELECT 1 FROM table_name WHERE column1 = 'value')
   BEGIN
       PRINT 'Record exists';
   END
ELSE
   BEGIN
      PRINT 'Record does not exist';
   END;
IF EXISTS …. or IF NOT EXISTS …
-- Check if rows exist in the table
IF EXISTS (SELECT * FROM Tb)
BEGIN
    -- Code block to execute if rows exist
    PRINT 'Rows exist in the table';
END;

-- Check if rows do not exist in the table
IF NOT EXISTS (SELECT * FROM Tb)
BEGIN
    -- Code block to execute if no rows exist
    PRINT 'No rows exist in the table';
END;
CASE
SELECT column1,
       CASE 
           WHEN column2 = 'value1' THEN 'Result1'
           WHEN column2 = 'value2' THEN 'Result2'
           ELSE 'Other'
       END AS result
FROM table_name;

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

Summary of SQL built-in functions

BETWEEN value1 and value2

The BETWEEN function in SQL is used to filter the result set within a specified range. It can be used with numeric, date, or textual values. Here’s a basic example:

SELECT *
FROM employees
WHERE age BETWEEN 25 AND 35;
STRING_AGG ( column_name, ”, ‘ )

The STRING_AGG() function in T-SQL (Transact-SQL) is used to concatenate values from multiple rows into a single string, separated by a specified delimiter. It’s particularly useful for combining text from multiple rows into a single row result.

SELECT STRING_AGG(column_name, ', ') AS concatenated_column
FROM table_name;
  • column_name is the name of the column containing the values you want to concatenate.
  • , is the delimiter that separates the values in the resulting string.
select id, AppDate,Company from cv where id between 270 AND 280
id	AppDate	Company
270	2021-04-24	dentalcorp  
272	2021-04-24	EMHware
274	2021-04-24	Altus Group  
276	2021-04-24	Dawn InfoTek Inc.
278	2021-04-25	Capco
280	2021-04-25	OPTrust  

select string_agg([Company],',') from cv where id between 270 AND 280 
concated
dentalcorp,EMHware,Altus Group,Dawn InfoTek Inc.,Capco,OPTrust  
Concatenating Text from Multiple Rows

For instance, suppose you have a table of employees with a column for their skills, and you want to get a list of all skills each employee has:

SELECT employee_id, STRING_AGG(skill, ', ') AS skills
FROM employee_skills
GROUP BY employee_id;
Generating a Comma-Separated List of Values

If you have a table of orders and you want to list all products in each order:

SELECT order_id, STRING_AGG(product_name, ', ') AS products
FROM order_details
GROUP BY order_id;
Creating a Summary Report

You can use STRING_AGG() to generate a summary report, for example, listing all customers in each country:

SELECT country, STRING_AGG(customer_name, '; ') AS customers FROM customers GROUP BY country;
Generating Dynamic SQL Queries
DECLARE @sql NVARCHAR(MAX); SELECT @sql = STRING_AGG('SELECT * FROM ' + table_name, ' UNION ALL ') FROM tables_to_query; EXEC sp_executesql @sql

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

Dynamic ETL Mapping in Azure Data Factory/Synapse Analytics: Source-to-Target Case Study Implementation (1)

This article is part of a series dedicated to dynamic ETL Source-to-Target Mapping (STM) solutions, covering both batch and near real-time use cases. The series will explore various mapping scenarios, including one-to-many, many-to-one, and many-to-many relationships, with implementations provided throughout.

You need create and alter metadata table privilege.

Scenario

In this article, I will focus on scenario where the source schema may have new or missing columns, or the destination schema may have columns with different names or might lack columns to accommodate new incoming source fields.

Requirement:

Dynamically handle source variations, map data to the consistent destination schema, and handle missing columns gracefully. Giving default value to missed column, add new column to target DB table if they are new coming.

Source:

CSV, Schema varies between executions (columns may be missing, reordered, or new).
current source columnsโ€™ name: name, age, gender and state

Destination:

Database, SQL DB,
columnsโ€™ name: emp_ID , emp_Name, emp_age, gender, dep_id.

Problem:

  • emp_ID and dep_id missed from source data.
  • schema name are not exactly same
    name <—-> emp_Name
    age <—-> emp_age
  • target DB table does not have the column “state”
Source data:
name age gander state
Bill 32 M NY
Mary 34 F CA
Tom 23 M FL
Jim 26 M CA
Afton_Taborek 45 F FL
Amรƒยฉlie_Gilker 43 M NY
target SQLDB Table: emp
CREATE TABLE [dbo].[emp](
[emp_id] [nvarchar](50) NULL,
[emp_name] [nvarchar](50) NULL,
[emp_age] [nvarchar](50) NULL,
[gender] [nvarchar](50) NULL,
[dep_id] [nvarchar](50) NULL,
)
col_name data_type allow_null
emp_id nvarchar(50) null
emp_name nvarchar(50) null
emp_age nvarchar(50) null
gender nvarchar(50) null
dep_id nvarchar(50) null

Key components and steps of solution.

  1. Create metadata to hold STM plan
  2. Get metadata activity retrieves source data schema – columns’ name, data type
  3. Reset fields active status to False.
  4. ForEach coming source fields in metadata table, activities/field mapping/target column
  5. Retrieving each STM mapping plan from metadata table generate complete mapping plan
  6. Copy activity applying the mapping plan to “Dynamic Content of Mapping”

Solution

Step 1: Create a metadata table to hold mapping plan

CREATE TABLE [dbo].[metadata](
	[source_filename] [varchar](max) NULL,
	[src_col] [varchar](50) NULL,
	[src_dataType] [varchar](50) NULL,
	[src_col_createdate] [datetime] DEFAULT getdate() NULL,
	[src_col_activity] [bit] NULL,
	[destination_schema] [varchar](50) NULL,
	[destination_table] [varchar](50) NULL,
	[dst_col] [varchar](50) NULL,
	[dst_dataType] [varchar](50) NULL,
	[dst_createdate] [datetime] DEFAULT getdate() NULL,
	[dst_col_activity] [bit] NULL,
	[mapping] [varchar](max) NULL
)

[mapping] column will be a json style string, that indicate this source column will map to target column’s name. Its pattern is :

{
  "source": {
     "name": "Field/column name",
     "type": "column generalized dataType",
     "physicalType":"coming column's Native dataType"
  },
  "sink": {
     "name": "Table column name",
     "type": "coming column's generalized dataType",
     "physicalType":"column's target database Native dataType"
  } 
}
“name”

Field’s name in the file, or column’s name in t DB table.

“type”

Logical Data Type. The abstract or generalized type used by Azure Data Factory (ADF) to interpret data regardless of the underlying system or format.
For example, string, integer, double etc.

“physicalType”

The specific type defined by the database or file system where the data resides.
For example, VARCHAR, NVARCHAR, CHAR, INT, FLOAT, NUMERIC(18,10), TEXT etc. in database

Each column has this source-to-sink mapping plan, we will concat all column’s mapping plan, generate a complete Source to Target mapping (STM) plan.

Step 2: Creating known field-column mapping plan

For each known field or column, create a Source-to-Target mapping plan, save it in the “mapping” column of the database metadata table, formatted in JSON style string.

# id field mapping plan
{
  "source": {
     "name": "id",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_id",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# name field mapping plan
{
  "source": {
     "name": "name",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_name",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# age field mapping plan
{
  "source": {
     "name": "age",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_age",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# gander field mapping plan
{
  "source": {
     "name": "gander",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "gender",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# "dep_id"field mapping plan
{
  "source": {
     "name": "dep_id",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "dep_id",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

We will utilize the column mapping plans to generate a comprehensive “copy activity” mapping plan.

For any new or unknown fields that may arise, we will address them in subsequent steps.

Step 3: get source metadata

Create a pipeline.

l name it “pl_dynamic_source_to_target_mapping”

Create variables

  • var_sourcename, string
  • var_field_name, string
  • var_field_type, string
  • var_mapping_plan, string

Add a “Get metadata” activity and setup it.

We need field list:

  • Item name,
  • Item type,
  • structure.

“it”get metadata” get the return

{
	"itemName": "name.csv",
	"itemType": "File",
	"structure": [
		{
			"name": "name",
			"type": "String"
		},
		{
			"name": "age",
			"type": "String"
		},
		{
			"name": "gander",
			"type": "String"
		},
		{
			"name": "state",
			"type": "String"
		}
	],
	"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
	"executionDuration": 1,
	"durationInQueue": {
		"integrationRuntimeQueue": 0
	},
	"billingReference": {
		"activityType": "PipelineActivity",
		"billableDuration": [
			{
				"meterType": "AzureIR",
				"duration": 0.016666666666666666,
				"unit": "Hours"
			}
		]
	}
}

Step 4: Reset the activity status of all source fields in the metadata table to False

Save source data name

Since we will address the item’s metadata one field by one field later, saving source data name in variable is convenient.

add a “Set variable” to save source data name in variable – “var_sourcename”

Reset all source fields to False

Add a “lookup activity”, reset the activity status of all source fields in the metadata table to False.

lookup query:

UPDATE metadata SET
src_col_activity = 0
WHERE source_filename = '@{variables('var_sourcename')}';
SELECT 1;

This is one of the important steps. It allows us to focus on the incoming source fields. When we build the complete ETL Source-to-Target mapping plan, we will utilize these incoming fields.

Step 5: ForEach address source data fields

Add the ‘ForEach activity’ to the pipeline, using the ‘structure’ to address the source data fields one by one.

Save source data field name and data type

In the ForEach activity, add two “Set variable” to save source data field name and data type in variable .
ForEach’s @item().name —> var_field_name
ForEach’s @item().type —> var_field_type

Lookup source fields in metadata table

Continue in ForEach activity, add a “lookup activity”, create a dataset point to metadata table.

Lookup query:

IF NOT EXISTS (
SELECT src_col from metadata
WHERE
source_filename = '@{variables('var_sourcename')}'
AND src_col = '@{variables('var_field_name')}'
)
BEGIN
-- Alter target table schema
IF NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'emp' AND COLUMN_NAME = '@{variables('var_field_name')}' )
ALTER TABLE emp ADD @{item().name} NVARCHAR(max);
SELECT 'target altered'; -- return

-- insert field metadata and STM plan
INSERT INTO metadata
(source_filename
, src_col
, src_dataType
, src_col_activity
, destination_schema
, destination_table
, dst_col
, dst_dataType
, dst_col_activity
, mapping)
VALUES
(
'@{variables('var_sourcename')}'
, '@{variables('var_field_name')}'
, '@{variables('var_field_type')}'
, 1
, 'dbo'
, 'emp'
, '@{variables('var_field_name')}'
, 'NVARCHAR'
, 1
, '{
"source": {
"name": "@{variables('var_field_name')}",
"type": "@{variables('var_field_type')}",
"physicalType":"@{variables('var_field_type')}"
},
"sink": {
"name": "@{variables('var_field_name')}",
"type": "nvarchar(max)",
"physicalType":"nvarchar(max)"
}
}'
);
SELECT 'insert field metadata';-- return
END
ELSE
BEGIN
UPDATE metadata SET src_col_activity = 1
WHERE source_filename = '@{variables('var_sourcename')}'
AND src_col = '@{variables('var_field_name')}'
select 'this field actived'; -- return
END;
  1. Check if the current source field exists in the ‘metadata’ table.
    If the field’s name is found, update its activity status to True as an existing field. If the field’s name is not present, it indicates a new field. Insert this new field into the metadata table and establish its mapping plan to specify its intended destination.
  2. Check the target table [emp] to verify if the column exists. If the column is not present, alter the schema of the target table [emp] to add a new column to the destination table.

the target table schema altered

new field, “state”, metadata inserted in to the metadata table

new field mapping plan

'{
  "source": {
     "name": "@{variables('var_field_name')}",
     "type": "@{variables('var_field_type')}",
     "physicalType":"@{variables('var_field_type')}"
  },
  "sink": {
     "name": "@{variables('var_field_name')}",
     "type": "@{variables('var_field_type')}",
     "physicalType":"@{variables('var_field_type')}"
  } 
}'

Step 6: Generate the complete ETL mapping plan

Generate complete ETL mapping plan

Add a “Lookup activity” to generate complete ETL mapping plan, use metadata table dataset.

This ‘lookup activity’ queries all activity field mapping plans from the metadata table to generate a complete STM mapping plan.

Query:
select 
concat(
'{"type": "TabularTranslator",
"mappings": [' 
, string_agg(mapping,',') 
,'],'
,'"typeConversion": true,"typeConversionSettings": {"allowDataTruncation": false, "treatBooleanAsNumber": false}'
) as stm
from metadata
where 
[source_filename] = '@{variables('var_sourcename')}' 
and [src_col_activity] = 1

Also add “Set variable” to save the STM to variable “var_mapping_plan”

@activity('lkp generate entire ETL mapping  plan').output.firstRow.stm

Step 7: Copy source data to target

Having established the dynamic mapping plan, we are now prepared to ingest data from the source and deliver it to the target. All preceding steps were dedicated to the development of the ETL mapping plan.

Copy activity: Applying the STM mapping plan

Add a “Copy activity”, using Source and Sink dataset we built previous.

changing to “Mapping” tag, click “Add dynamic content”, write expression:

@json(variables('var_mapping_plan'))

All previous steps were dedicated to building the ETL mapping plan.

Done !!!

Afterword

This article focuses on explaining the underlying logic of dynamic source-to-target mapping through a step-by-step demonstration. To clearly illustrate the workflow and logic flow, four “Set Variable” activities and four pipeline variables are included. However, in a production environment, these are not required.

Having demonstrated dynamic source-to-target mapping with all necessary logic flow steps, this solution provides a foundation that can be extended to other scenarios, such as one-to-many, many-to-one, and many-to-many mappings. Implementations for these scenarios will be provided later.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
(remove all space from the email account ๐Ÿ˜Š)

PySpark Data sources

PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive

Query Database Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
# Query table using jdbc()
# Parameters
server_name = "myserver.database.windows.net"
port = "1433"
database_name = "mydatabase"
table_name = "mytable"
username = "myusername"
password = "mypassword"

# Construct the JDBC URL
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read data from the SQL database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
# show DataFrame
df_dep.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

Query JDBC Table Parallel, specific Columns

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from emp where sex='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()
Read & Write CSV File

Read CSV File

df = spark.read.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.load("/Folder/, /filePath2/")

Write CSV File

df = spark.write.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.mode("overwrite")
.save("/filePath/filename")

By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

we need take additional step – “Rename the File to name1.csv

# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")

# Filter for the part file
for file in files:
    if file.name.startswith("part-"):
        source_file = file.path  # Full path to the part file
        destination_file = "dbfs:/FileStore/name1.csv"
        
        # Move and rename the file
        dbutils.fs.mv(source_file, destination_file)
        break


display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name

PySpark doesnโ€™t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

Steps:

  1. Use coalesce(1) to combine all data into a single partition.
  2. Save the file to a temporary location.
  3. Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/temp_folder")

# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
    if file.name.startswith("part-"):
        part_file = file.path
        break

# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")

# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Read & Write Parquet File

sample data save at /tmp/output/people.parquet

dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet

read from parquet

sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

read a parquet is the same as reading from csv, nothing special.

write to a parquet

  • append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.

    df.write.mode(“append”).parquet(“path/to/parquet/file”)
  • overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“overwrite”).parquet(“path/to/parquet/file”)
  • ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“ignore”).parquet(“path/to/parquet/file”)
  • Create Parquet partition file

    df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

Read & write Delta Table in PySpark

Read from a Delta Table

Read from a Registered Delta Table (in the Catalog)
df = spark.read.table("default.dim_dep")

df.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+
Read from a Delta Table Stored in a Directory (Path-Based)
df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_delta_table.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Write to a Delta Table

Append to a Delta Table
# Appends to the Delta table
df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")  
Overwrite a Delta Table
# Overwrites the Delta table
df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")  
Create or Overwrite a Registered Delta Table
# Overwrites the table in the catalog
df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep") 
Append to a Registered Delta Table:
# Appends to the table in the catalog
df.write.format("delta").mode("append").saveAsTable("default.dim_dep")  

merge operation (upsert)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Perform the merge operation
target_table.alias("t").merge(
    source_df.alias("s"),
    "t.id = s.id"  # Join condition: match rows based on `id`
).whenMatchedUpdate(
    set={
        "name": "s.name",  # Update `name` column
        "date": "s.date"   # Update `date` column
    }
).whenNotMatchedInsert(
    values={
        "id": "s.id",      # Insert `id`
        "name": "s.name",  # Insert `name`
        "date": "s.date"   # Insert `date`
    }
).execute()

# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
  1. Target Table (target_table):
    • The Delta table is loaded using DeltaTable.forPath.
    • This table contains existing data where updates or inserts will be applied.
  2. Source DataFrame (source_df):
    • This DataFrame contains new or updated records.
  3. Join Condition ("t.id = s.id"):
    • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
  4. whenMatchedUpdate:
    • If a matching row is found, update the name and date columns in the target table.
  5. whenNotMatchedInsert:
    • If no matching row is found, insert the new record from the source DataFrame into the target table.
  6. execute():
    • Executes the merge operation, applying updates and inserts.
  7. Result Verification:
    • After the merge, the updated Delta table is read and displayed.

Schema Evolution

df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
Read & Write JSON file

Read a simple JSON file

# Read a simple JSON file into dataframe
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

df_simple = spark.read.format("json") \
.option("multiline","True")\
.load("dbfs:/FileStore/simple_json.json")

df_simple.printSchema()
root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)

df_simple.show()
+------------+------------+-----+-----------+-------+
|        City|RecordNumber|State|ZipCodeType|Zipcode|
+------------+------------+-----+-----------+-------+
|BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+------------+------------+-----+-----------+-------+

pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error

df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
df_simple,show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).

Reading from Multiline JSON (JSON Array) File

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
df_jsonarray_simple = spark.read\
    .format("json")\
    .option("multiline", "true")\
    .load("dbfs:/FileStore/jsonArrary.json")

df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

read complex json

df_complexjson=spark.read\
    .option("multiline","true")\
    .json("dbfs:/FileStore/jsonArrary2.json")

df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0001                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Cake                                                                                                                                      
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}                                                           
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}] 
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0002                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Raised                                                                                                                                    
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}]}                                                                                                                       
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]                                                           
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0003                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Old Fashioned                                                                                                                             
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}]}                                                                                                    
 topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]                                                                          

Reading from Multiple files at a time

# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json','resources/zipcode2.json'])

Reading from Multiple files at a time

# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")

Reading files with a user-specified custom schema

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("resources/zipcodes.json")
df_with_schema.printSchema()

Reading File using PySpark SQL

spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Write to JSON

Options

  • path: Specifies the path where the JSON files will be saved.
  • mode: Specifies the behavior when writing to an existing directory.
  • compression: Specifies the compression codec to use when writing the JSON files (e.g., โ€œgzipโ€, โ€œsnappyโ€).
    df2.write . option(“compression”, “gzip”)
  • dateFormat: Specifies the format for date and timestamp columns.
    df.write . option(“dateFormat”, “yyyy-MM-dd”)
  • timestampFormat: Specifies the format for timestamp columns.
    df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
  • lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline)
    df . write . option(“lineSep”, “\r\n”)
  • encoding: Specifies the character encoding to use when writing the JSON files.
    UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
    df . write . option(“encoding”, “UTF-8”)
df2.write . format("json")
 . option("compression", "gzip") \
 . option("dateFormat", "yyyy-MM-dd") \ 
 . option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
 . option("encoding", "UTF-8") \
 . option("lineSep", "\r\n")
 . save("dbfs:/FileStore/output_json")

modes

  1. Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
  2. Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
  3. Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
  4. Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Read & Write SQL Server

Read from SQL Server

server_name = "mainri-sqldb.database.windows.net"
port=1433
username="my login name"
password="my login password"
database_name="mainri-sqldb"
table_name="dep"

jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

alternative way

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .load()

Select Specific Columns to Read

In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("databaseName",database_name) \
  .option("query", "select * from [dbo].[dep] where depid>3") \
  .option("user", username) \
  .option("password", password) \
  .load()

df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    4|human resource|
|    5|         admin|
+-----+--------------+

write to table

write mode:

Append:
mode("append") to append the rows to the existing SQL Server table.

# we have define variables, here is show again
server_name = “mainri-sqldb.database.windows.net”
port=1433
username=”my login name”
password=”my login password”
database_name=”mainri-sqldb”
table_name=”dep”
jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”

# append the rows to the existing SQL Server table.

df_newrow.show()
+-----+-----+
|depid|  dep|
+-----+-----+
|    5|admin|
+-----+-----+

df_newrow.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
|    5|         admin| <- new added row
+-----+--------------+

overwrite

The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.

df_newrow.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
Read & Write MySQL

Read from MySQL

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select Specific Columns to Read

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("numPartitions",4) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Write to MySQL

Some points to note while writing

  • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
  • To retain the indexes, use option("truncate","true").
  • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
  • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
Read JDBC in Parallel

PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. 

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

# Read Table in Parallel
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable","employee") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select columns with where clause

# Select columns with where clause
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Using fetchsize with numPartitions to Read

The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.

# Using fetchsize
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Read & Write delta table, Catalog, Hive Table

Read delta table, Catalog, Hive Table

# Read Hive table
df = spark.sql("select * from emp.employee")
df.show()
# Read Hive table
df = spark.read.table("employee")
df.show()
# Read delta table
df = spark.sql("select * from delta.` table path `  ")
df.show()

caution: ` Backticks 

Write / Save

To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

use SQL, temporary view

# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")

# Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

PySpark DataFrame

PySpark DataFrame is a distributed collection of rows, similar to a table in a relational database or a DataFrame in Python’s pandas library. It provides powerful tools for querying, transforming, and analyzing large-scale structured and semi-structured data.

PySpark apply functions

Apply a function to a column

df.withColumn("Upper_Name", upper(df.Name))
df.select("Seqno","Name", upper(df.Name))

df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB")

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(upperCase,StringType())
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB")
collect ( )

collect () is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
dataCollect = deptDF.collect()
print(dataCollect)
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
Column Class

Access column

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)
+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

Column Operators

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

df.select(df.col2 < df.col3).show()
+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

df.select(df.col2 == df.col3).show()
+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+
Convert DataFrame to Pandas

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

pandasDF = pysparkDF.toPandas()
Convert RDD to DataFrame
df = rdd.toDF()
Create an empty DataFrame

Create an empty DataFrame

#Create Schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()
dropDuplicates, distinct ()

key different between distinct() and dropDuplicates()

  • distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness.
  • distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct().
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |  #duplicated
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df.distinct().show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
Distinct count: 9
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

df2 = df.dropDuplicates(["department"])
print("Distinct count: "+str(df2.count()))
Distinct count: 3
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
+-------------+----------+------+
fillna() & fill()

DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values.

# Prepare Data
data = [("James", None, 3000), \
    ("Michael", "Sales", None), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", None), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", None, 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |null      |3000  |
|Michael      |Sales     |null  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |null  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |null      |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|      null|  3000|
|      Michael|     Sales|     0|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|     0|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|      null|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
#Replace 0 for null on only population column 
df.na.fill(value="unknown",subset=["department"]).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|   unknown|  3000|
|      Michael|     Sales|  null|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  null|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|   unknown|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
groupBy ( )

groupBy ( ), Similar to SQL GROUP BY clause,  transformation that is used to group rows that have the same values in specified columns into summary rows

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|     Sales|   NY|     176000|     30000|
|     Sales|   CA|      81000|     23000|
|   Finance|   CA|     189000|     47000|
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
| Marketing|   CA|      80000|     18000|
+----------+-----+-----------+----------+
join ( )
  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
  • Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows.
  • Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame.
  • Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame.
  • Self Join:
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
# Inner join
deptDF.join(empDF, deptDF.dept_id==empDF.emp_dept_id, "inner").show()
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
return: left-hand has, right-hand has too
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
return: left-hand has, but right-hand does not have.
# Self join
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
orderBy( ) and sort( )

orderBy() and sort() can be interchange each other

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
# Sorting different columns in different orders

df.sort("state", "age",ascending=[False,True]).show()
df.sort(df["state"].desc(), df["age"].asc()).show()
df.orderBy("state", "age",ascending=[False,True]).show()
df.orderBy(df["state"].desc(), df["age"].asc()).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+
partitionBy ( )

partitionBy ( )

pivot ( ) & Unpivot ( )

pivot() (Row to Column)

#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
# Output
root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

# Applying pivot()
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

# Output
root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

# one more example
pivotDF = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
root
 |-- Country: string (nullable = true)
 |-- Banana: long (nullable = true)
 |-- Beans: long (nullable = true)
 |-- Carrots: long (nullable = true)
 |-- Orange: long (nullable = true)

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|China  |400   |1500 |1200   |4000  |
|USA    |1000  |1600 |1500   |4000  |
|Mexico |null  |2000 |null   |null  |
|Canada |2000  |null |2000   |null  |
+-------+------+-----+-------+------+

Unpivot 

PySpark SQL doesnโ€™t have unpivot function hence will use the stack() function. 

# Applying unpivot()
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+
sample(), sampleBy()

sample(), is a mechanism to get random sample records from the dataset

Syntax

sample(withReplacement, fraction, seed=None)

  • fraction โ€“ Fraction of rows to generate, range [0.0, 1.0]. Note that it doesnโ€™t guarantee to provide the exact number of the fraction of records.
  • seed โ€“ Seed for sampling (default a random seed). Used to reproduce the same random sampling.
  • withReplacement โ€“ Sample with replacement or not (default False).
df=spark.range(100)
print(df.sample(0.06).collect())

#Output: [Row(id=0), Row(id=2), Row(id=17), Row(id=25), Row(id=26), Row(id=44), Row(id=80)]

Above example, my DataFrame has 100 records and I wanted to get 6% sample records which are 6 but the sample() function returned 7 records. This proves the sample function doesnโ€™t return the exact fraction specified.

To get consistent same random sampling uses the same slice value for every run. Change slice value to get different results.

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,456).collect())
//Output: 19,21,42,48,49,50,75,80

sampleBy()

sampleBy(col, fractions, seed=None)

df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

//Output: [Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]
select ( )
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+
# Select columns by different ways
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

# By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Nested Struct Columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+
# Select child columns
df2.select("name.firstname","name.lastname").show(truncate=False)
+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+
# Select all child columns
df2.select("name.*").show(truncate=False)
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+
show ( )
df.show()
+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+
df.show(truncate=False)
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)
-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
only showing top 3 rows
StructType & StructField

StructType

Defines the structure of the DataFrame. StructType represents a schema, which is a collection of StructField objects. A StructType is essentially a list of fields, each with a name and data type, defining the structure of the DataFrame. It allows for the creation of nested structures and complex data types.

StructField

StructField โ€“ Defines the metadata of the DataFrame column. It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

nested StructType

# nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+
transform ( )

transform ( )

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# PySpark transform() Usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount)
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+
UDF

UDF (User Defined Function)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
#create a python function
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 
#Convert a Python function to PySpark UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

convertUDF = udf(lambda z: convertCase(z),StringType())
or 
convertUDF = udf(convertCase,StringType())
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+
union ( ) & unionAll ( )

union ( ) & unionAll ( ) are the same result. unionAll is older, retired


df1
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+
df2
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
df.union(df2).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|<--duplicated
|Maria        |Finance   |CA   |90000 |24 |23000|<--duplicated
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
unionByName ( )

df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2

df1
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+
df2
+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+
# different columns order
df3 = df1.unionByName(df2)
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+
# different columns name and order 
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+
df3=df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+
where() & filter()

where() & filter() can replace each other

  • Use &, |, ~ for logical operations (AND, OR, NOT).
  • Use ==, !=, >, <, >=, <= for comparisons.
  • Always wrap column references in col() for clarity.
  • For SQL-like patterns, consider using functions like like, isin, and between.
  • IS NULL –> “isNull ( )”
  • IS NOT NULL –> “isNotNull ( )”
  • LIKE –> “like ( %abc% )”
  • IN –> “isin (18, 21, 25)”
  • BETWEEN –> “between(18, 25)”
+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+
df.select("gender").filter(df.gender == "M").show()
df.select("gender").where(df.gender == "F").show()
+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     M|
+------+

+------+
|gender|
+------+
|     F|
|     F|
+------+
withColumn()
from pyspark.sql.functions import date_add, col
df.withColumn("dob", date_add("dob", 10)).\
withColumn("newsalary",col("salary")*100).\
drop("middlename").show()
+---------+--------+----------+------+------+---------+
|firstname|lastname|       dob|gender|salary|newsalary|
+---------+--------+----------+------+------+---------+
|    James|   Smith|1991-04-11|     M|   300|    30000|
|  Michael|        |2000-05-29|     M|   400|    40000|
|   Robert|Williams|1978-09-15|     M|   400|    40000|
|    Maria|   Jones|1967-12-11|     F|   400|    40000|
|      Jen|   Brown|1980-02-27|     F|    -1|     -100|
+---------+--------+----------+------+------+---------+
withColumnRenamed ( )

withColumnRenamed() rename a DataFrame column, we often need to rename one column or multiple (or all) columns on PySpark DataFrame

+--------------------+----------+------+------+
|                name|       dob|gender|salary|
+--------------------+----------+------+------+
|    {James, , Smith}|1991-04-01|     M|  3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|    -1|
+--------------------+----------+------+------+
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.show()
+--------------------+-----------+------+-------------+
|                name|DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|    {James, , Smith}| 1991-04-01|     M|         3000|
|   {Michael, Rose, }| 2000-05-19|     M|         4000|
|{Robert, , Williams}| 1978-09-05|     M|         4000|
|{Maria, Anne, Jones}| 1967-12-01|     F|         4000|
|  {Jen, Mary, Brown}| 1980-02-17|     F|           -1|
+--------------------+-----------+------+-------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

PySpark Built-in Functions

PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.

Visual Summary of Categories

CategoryFunctions
Basic Functionsalias, cast, lit, col, when, isnull, isnan
String Functionsconcat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap
Date and Time Functionscurrent_date, datediff, to_date, year, hour, unix_timestamp, date_format
Mathematical Functionsabs, round, floor, sqrt, pow, exp, log, sin, cos, rand
Aggregation Functionscount, sum, avg, min, max, stddev, collect_list
Array and Map Functionsarray, size, array_contains, explode, map_keys, map_values
Null Handling Functionsisnull, na.fill, na.drop, na.replace
Window Functionsrow_number, rank, ntile, lag, lead, cume_dist, percent_rank
Statistical Functionscorr, covar_samp, approx_count_distinct, percentile_approx
UDF and Advanced Functionsudf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json
sample DataFrames
Sample dataframe 
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime       |
+---+-----+-----------------------+
|1  |F    |2024-12-03 00:46:02.165|
|2  |E    |2024-12-03 00:46:02.165|
|3  |D    |2024-12-03 00:46:02.165|
|4  |G    |2024-12-03 00:46:02.165|
|6  |J    |2024-12-03 00:46:02.165|
|5  |I    |2024-12-03 00:46:02.165|
|7  |H    |2024-12-03 00:46:02.165|
|8  |K    |2024-12-03 00:46:02.165|
|9  |L    |2024-12-03 00:46:02.165|
+---+-----+-----------------------+

PySpark datetime related functions

PySpark provides a rich set of functions in the pyspark.sql.functions module to manipulate and analyze datetime columns.

date time Formatting

Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second

from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
          .show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime       |formatted_date|
+---+-----+-----------------------+--------------+
|1  |F    |2024-12-03 00:46:02.165|2024-12-03    |
|2  |E    |2024-12-03 00:46:02.165|2024-12-03    |
+---+-----+-----------------------+--------------+
Converting Between Types
  • to_date(column, format): Converts a string to a date.
  • unix_timestamp(column, format): Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp

dfc.withColumn("date_only", to_date("current_date")) \
    .withColumn("unix_time", unix_timestamp("current_date"))\          .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06    |2024-12-06|1733443200|
|2024-12-06    |2024-12-06|1733443200|
  • to_timestamp(column, format): Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.
  • from_unixtime(unix_time, format): Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit

df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
|    string|
+----------+
|2024-12-05|
+----------+

df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
|    string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+

df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
|    string|unixTimeStamp|
+----------+-------------+
|2024-12-05|   1733343200|
+----------+-------------+

df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
|   1733343200|                              2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
  • date_add ()
  • date_sub ()
  • add_month ()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()
+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )

PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.

from pyspark.sql.functions import col, current_date, datediff

df2 = df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    )
+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2024-12-06|    1985|
|2019-06-24|  2024-12-06|    1992|
|2019-08-24|  2024-12-06|    1931|
+----------+------------+--------+
months_between ( )

PySpark SQL months_between() function to get the number of months between two dates

from pyspark.sql.functions import col, current_date, datediff, months_between, round

df3 = df.withColumn("today", current_date())\
    .withColumn("monthsDiff", months_between(current_date(), col("date"))) \
    .withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id|      date|     today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
|  1|2019-07-01|2024-12-06|65.16129032|           65.16|
|  2|2019-06-24|2024-12-06|65.41935484|           65.42|
|  3|2019-08-24|2024-12-06|63.41935484|           63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years

utilize the months_between() function to get the difference in months and then convert it into years.

from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit

df4 = df.withColumn("today", current_date()) \
  .withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
  .withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))

+---+----------+----------+-----------------+---------------+
| id|      date|     today|        yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
|  1|2019-07-01|2024-12-06|5.430107526666667|           5.43|
|  2|2019-06-24|2024-12-06|5.451612903333333|           5.45|
|  3|2019-08-24|2024-12-06|5.284946236666666|           5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two

Calculates the difference between two

trunc ( )

trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01

df.select(col("input"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()
+----------+----------+-----------+
|     input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval

interval Used for advanced time calculations (not directly available but works with PySpark SQL).


Extracting Components from a Datetime
  • year(column): Extracts the year.
  • quarter(column): Returns the quarter as an integer from a given date or timestamp.
  • dayofyear(column): Extracts the day of the year from a given date or timestamp.
  • dayofmonth(column): Extracts the day of the month.
  • dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
  • weekofyear(column): Returns the week number of the year.
  • last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
  • next_day (column, day_of_week) e.g. Mon, Sunday
  • hour(column): Extracts the hour.
  • minute(column): Extracts the minute.
  • second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second

df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input                  |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1      |1    |2   |46    |2     |
|2023-01-11 15:35:32.265|2023|1      |1    |15  |35    |32    |
|2022-09-17 22:16:02.186|2022|3      |9    |22  |16    |2     |
+-----------------------+----+-------+-----+----+------+------+

from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format

df.select(date_format("input","yyy-MM-dd").alias("input"), 
    dayofweek("input").alias('dayofweek'), 
     dayofmonth("input").alias('dayofmonth'),
     weekofyear("input").alias("weekofyear") ,
     next_day("input","mon").alias("nextday"),
     last_day("input").alias('lastday')
  ).show()
+----------+---------+----------+----------+----------+----------+
|     input|dayofweek|dayofmonth|weekofyear|   nextday|   lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01|        2|         1|         1|2024-01-08|2024-01-31|
|2023-01-11|        4|        11|         2|2023-01-16|2023-01-31|
|2022-09-17|        7|        17|        37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
  • current_date (),
  • current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp

dfc.withColumn ("current_date", current_date())\
    .withColumn ("current_timestamp", current_timestamp())\
    .select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp      |
+------------+-----------------------+
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
+------------+-----------------------+

PySpark string related functions

btrim (str[, trim] ), trim ( )

btrim(str[, trim]) Trim characters at the beginning and end of the string โ€˜strโ€™ are removed.

  • trim (): Removes only whitespace from the beginning and end of a string.
    when you only need to clean up whitespace from strings.
  • btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
    when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
|      text   |
+-------------+
|   hello     |
|  !!spark!!  |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
|      text|  trimmed|
+----------+---------+
|   hello  |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
|      text |trimmed_custom|
+-----------+--------------+
|   hello   |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()

concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.

  • concat ( ) : No delimiter is added between the concatenated values.
    when you need strict concatenation without any delimiters.
  • concat_ws (): with a specified delimiter between the values.
    when you need a delimiter or want to ignore NULL values.
from pyspark.sql.functions import concat, concat_ws, lit

df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|      null|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+

df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|     Alice|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )

endswith() Returns a boolean.

+--------------+
|          text|
+--------------+
|   hello world|
|PySpark is fun|
|       welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
|          text|end_with?|
+--------------+---------+
|   hello world|    false|
|PySpark is fun|     true|
|       welcome|    false|
+--------------+---------+

df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
|          text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )

contains () check whether a PySpark DataFrame column contains a specific string or not,

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
|     full_name|contain?|
+--------------+--------+
|      John Doe|   false|
|    Jane Smith|    true|
|Robert Johnson|   false|
+--------------+--------+

df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )

find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).

length ( )

length ( ) Provides the length of characters for string data or the number of bytes for binary data.

from pyspark.sql.functions import length

df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
|      text|char_length|
+----------+-----------+
|     hello|          5|
|   PySpark|          7|
|Databricks|         10|
+----------+-----------+
like ( )

like ( ) use && and || operators to have multiple conditions in Scala.

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
|          name|   R%|  %th|%John%|
+--------------+-----+-----+------+
|      John Doe|false|false|  true|
|    Jane Smith|false| true| false|
|Robert Johnson| true|false|  true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
|          name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )

substring (str, pos[, len]): Returns the substring of str that starts at pos and is of length len

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+
from pyspark.sql.functions import substring

df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
|    Ja|       Ja|
|    An|       An|
|    To|       To|
|    To|       To|
+------+---------+
split ( )

PySpark โ€“ split(), Splitting a column into multiple columns

from pyspark.sql.functions import split

df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
|     full_name|      split_names|
+--------------+-----------------+
|      John,Doe|      [John, Doe]|
|    Jane,Smith|    [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+

split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+

df_expanded = df_with_split.select(
    "full_name",
    df_with_split["split_names"].getItem(0).alias("first_name"),
    df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+
translate ( )

 translate() string function can replace character by character of DataFrame column value.

from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color|       new_color|        replaced|
+-----+----------------+----------------+
|    F|almost colorless|A_N0st c0_0r_ess|
|    E|            null|            null|
|    D|       colorless|       c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )

PySpark โ€“ regexp_replace() replace a column value with a string for another string/substring

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

from pyspark.sql.functions import regexp_replace

df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)
+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
   .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False
+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+
overlay

PySpark โ€“ overlay()

+---------------+----+
|           col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay

df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
|      overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
  • upper: Converts all characters in the column to uppercase.
  • lower: Converts all characters in the column to lowercase.
  • initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
|             text|
+-----------------+
|      hello world|
|spark sql example|
|  UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap

df.withColumn("Uppercase", upper("text"))\
   .withColumn("lowercase", lower("text"))\
    .withColumn("Capitalized", initcap("text"))\
    .show()
+-----------------+-----------------+-----------------+-----------------+
|             text|        Uppercase|        lowercase|      Capitalized|
+-----------------+-----------------+-----------------+-----------------+
|      hello world|      HELLO WORLD|      hello world|      Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
|  UPPER and LOWER|  UPPER AND LOWER|  upper and lower|  Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+

Numeric Functions

Mathematical operations on numeric columns.

abs()

abs(): Absolute value.

from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )

round(): Round to a specific number of decimals.

from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )

pow(): Power function.

from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))

Aggregate Functions

sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
  • approx_count_distinct (): returns the count of distinct items in a group
  • countdistinct: returns the count of distinct items in a group
  • distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct

df.select(approx_count_distinct("salary").alias("approx_count_distinct"), 
          countDistinct("salary").alias("countDistinct"),
          ).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
|                    6|            6|
+---------------------+-------------+

df.select("salary").distinct().show()
+------+
|salary|
+------+
|  3000|
|  4600|
|  4100|
|  3300|
|  3900|
|  2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
  • avg ( ): average of values in the input column
  • sum ( )
  • sumDistinct ( ): returns the sum of all distinct values in a column.
  • max ( )
  • min ( )
  • mean ( )
from pyspark.sql.functions import avg,sum, max, min

df.select(avg("salary").alias("avg"),
    sum("salary").alias("sum"),
    max("salary").alias("max"),
    min("salary").alias("min")
    ).show()
+------+-----+----+----+
|   avg|  sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  2|   20|
|  3|   30|
|  4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
|      20.0|
+----------+
first (), last ()
  • first() returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element.
  • last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last

df.select(first("salary").alias("first"),
         last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )

PySpark โ€“ collect_list() returns all values from an input column with duplicates.

from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )

PySpark โ€“ collect_set() returns all values from an input column without duplicates.

from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+

PySpark Window functions

PySparkโ€™s Window Ranking functions, like row_number()rank(), and dense_rank(), assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.

  • row_number() assigns unique sequential numbers,
  • rank() provides the ranking with gaps,
  • dense_rank() offers ranking without gaps.
row_number ()

row_number() window function gives the sequential row number starting from 1 to the result of each window partition.

from pyspark.sql.window import Window
from pyspark.sql.functions import col,  row_number

windowSpec  = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
    .where(col("row_number") < 4).show()

+---+---------+-----+----------+
|_c0|      cut|color|row_number|
+---+---------+-----+----------+
|677|     Fair|    D|         1|
|772|     Fair|    D|         2|
|940|     Fair|    D|         3|
| 43|     Good|    D|         1|
| 44|     Good|    D|         2|
|239|     Good|    D|         3|
| 63|    Ideal|    D|         1|
| 64|    Ideal|    D|         2|
|121|    Ideal|    D|         3|
| 55|  Premium|    D|         1|
| 62|  Premium|    D|         2|
|151|  Premium|    D|         3|
| 29|Very Good|    D|         1|
| 35|Very Good|    D|         2|
| 39|Very Good|    D|         3|
+---+---------+-----+----------+
rank ()

rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

from pyspark.sql.functions import rank
from pyspark.sql.window import Window

windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()

+-----+---------+-----+-----+----+
|  _c0|      cut|color|price|rank|
+-----+---------+-----+-----+----+
|   29|Very Good|    D|  357|   1|
|28262|Very Good|    D|  357|   1|
|28272|     Good|    D|  361|   3|
|28273|Very Good|    D|  362|   4|
|28288|Very Good|    D|  367|   5|
|31598|    Ideal|    D|  367|   5|
|31601|  Premium|    D|  367|   5|
|31602|  Premium|    D|  367|   5|
|31618|Very Good|    D|  373|   9|
|34922|Very Good|    D|  373|   9|
|38277|  Premium|    D|  386|  11|
|38278|  Premium|    D|  386|  11|
|38279|  Premium|    D|  386|  11|
|38280|  Premium|    D|  386|  11|
|41581|Very Good|    D|  388|  15|
|41582|Very Good|    D|  388|  15|
dense_rank ()

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
|  _c0|color|price|dense_rank|
+-----+-----+-----+----------+
|   29|    D|  357|         1|
|28262|    D|  357|         1|
|28272|    D|  361|         2|
|28273|    D|  362|         3|
|28288|    D|  367|         4|
|31598|    D|  367|         4|
|31601|    D|  367|         4|
|31602|    D|  367|         4|
|31618|    D|  373|         5|
|34922|    D|  373|         5|
|38277|    D|  386|         6|
|38278|    D|  386|         6|
|38279|    D|  386|         6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0  |color|price|percent_rank         |
+-----+-----+-----+---------------------+
|29   |D    |357  |0.0                  |
|28262|D    |357  |0.0                  |
|28272|D    |361  |2.952465308532625E-4 |
|28273|D    |362  |4.428697962798937E-4 |
|28288|D    |367  |5.90493061706525E-4  |
|31598|D    |367  |5.90493061706525E-4  |
|31601|D    |367  |5.90493061706525E-4  |
|31602|D    |367  |5.90493061706525E-4  |
|31618|D    |373  |0.00118098612341305  |
|34922|D    |373  |0.00118098612341305  |
|38277|D    |386  |0.0014762326542663124|
|38278|D    |386  |0.0014762326542663124|
lag (), lead ( )
  • lag ( ) function allows you to access a previous rowโ€™s value within the partition based on a specified offset.
  • lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
  .withColumn("lead",lead("salary",2).over(windowSpec))\
  .show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
|        Maria|   Finance|  3000|null|3900|
|        Scott|   Finance|  3300|null|null|
|          Jen|   Finance|  3900|3000|null|
|        Kumar| Marketing|  2000|null|null|
|         Jeff| Marketing|  3000|null|null|
|        James|     Sales|  3000|null|4100|
|        James|     Sales|  3000|null|4100|
|       Robert|     Sales|  4100|3000|4600|
|         Saif|     Sales|  4100|3000|null|
|      Michael|     Sales|  4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )

ntile ( ) returns the relative rank of result rows within a window partition

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

PySpark json related functions

sample data
+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )

The explode() function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.

from pyspark.sql.functions import explode
explode(col)

col: The name of the column or an expression containing an array or map to be exploded.

Return a new row for each element in the array or each key-value pair in the map.

# Usage with Arrays:
# Sample data
data = [
    (1, ["a", "b", "c"]),
    (2, ["d", "e"]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
|  1|     a|
|  1|     b|
|  1|     c|
|  2|     d|
|  2|     e|
+---+------+

# Usage with Maps:
# Sample data
data = [
    (1, {"key1": "value1", "key2": "value2"}),
    (2, {"key3": "value3"}),
    (3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])

# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key|  value|
+---+----+-------+
|  1|key1| value1|
|  1|key2| value2|
|  2|key3| value3|
+---+----+-------+

work with json

Exploding a JSON Array

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "values": ["a", "b", "c"]}',),
    ('{"id": 2, "values": ["d", "e"]}',),
    ('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data                           |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]}     |
|{"id": 3, "values": []}             |
+------------------------------------+

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values   |
+---+---------+
|1  |[a, b, c]|
|2  |[d, e]   |
|3  |[]       |
+---+---------+

# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    c|
|  2|    d|
|  2|    e|
+---+-----+

Exploding a JSON Map

from pyspark.sql.types import MapType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
    ('{"id": 2, "properties": {"key3": "value3"}}',),
    ('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data                                                    |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}}                  |
|{"id": 3, "properties": {}}                                  |
+-------------------------------------------------------------+

# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties                      |
+---+--------------------------------+
|1  |{key1 -> value1, key2 -> value2}|
|2  |{key3 -> value3}                |
|3  |{}                              |
+---+--------------------------------+

# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|key1|value1|
|  1|key2|value2|
|  2|key3|value3|
+---+----+------+
  • Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the explode operation.
  • Complex JSON Structures: For deeply nested JSON structures, use nested from_json and explode calls as needed.
from_json ( )

from_json ( ): Converts JSON string into Struct type or Map type.

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object

get_json_object () Extracts JSON element from a JSON string based on json path specified.

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+
json_tuple ( )

json_tuple() Extract the Data from JSON and create them as a new columns.

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )

to_json ()  is used to convert DataFrame columns MapType or Struct type to JSON string

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )

schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.

pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) โ†’ Column

from pyspark.sql.functions import schema_of_json, col

# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'), 
        ("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]

df = spark.createDataFrame(data, columns)

# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]


struct<name:string,age:int,skills:array<string>>

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.

expr ()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

#Add Month value from another column
df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,

lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|  111| 50000|         1|
|  222| 60000|         1|
|  333| 40000|         1|
+-----+------+----------+
typedLit ()

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame

df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)

Difference between lit() and typedLit() is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()


Stack ( )

Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.

Syntax

stack(n: Int, exprs: String*): Column

Parameters

  • n: The number of rows to create per input row. Each set of n expressions in exprs corresponds to a new row.
  • exprs: A sequence of column-value pairs, typically specified as strings in the format "column_name, column_value".
+---+---+---+---+
| id|  A|  B|  C|
+---+---+---+---+
|  1|100|200|300|
|  2|400|500|600|
+---+---+---+---+

# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
    "id",
    "stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|       A|  100|
|  1|       B|  200|
|  1|       C|  300|
|  2|       A|  400|
|  2|       B|  500|
|  2|       C|  600|
+---+--------+-----+

The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns.  StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.

StructType & StructField

Simple STructType and StructField

# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark . createDataFrame(data=data, schema=schema)

Nested StructType object struct

# Defining schema using nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

When ()

When () is similar to SQL and programming languages.

from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
   when(dfc.color == "F", "almost colorless")
  . when(dfc.color == "D", "colorless")
  . when(dfc.color == "G", "indistinguishable colorless")
  . when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
  .otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color                  |
+-----+---------------------------+
|F    |almost colorless           |
|E    |very colorful              |
|D    |colorless                  |
|G    |indistinguishable colorless|
|J    |almost colorless           |
|I    |almost colorless           |
|H    |very colorful              |
|K    |very colorful              |
|L    |very colorful              |
+-----+---------------------------+

df3 = df.withColumn("new_gender", expr(
     "CASE WHEN gender = 'M' THEN 'Male' " + 
          "WHEN gender = 'F' THEN 'Female' 
           WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, 
     CASE WHEN gender = 'M' THEN 'Male' " + 
         "WHEN gender = 'F' THEN 'Female' 
          WHEN gender IS NULL THEN ''" +
         "ELSE gender END as new_gender from EMP").show()

attention: above 2 example segments plus one by one

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

deltaTable vs DataFrames

In Databricks and PySpark, DeltaTables and DataFrames both handle structured data but differ in functionality and use cases. Here’s a detailed comparison:

Definitions

DeltaTable

A DeltaTable is a storage format based on Apache Parquet, with support for ACID transactions, versioning, schema enforcement, and advanced file operations. It is managed by the Delta Lake protocol, offering features like time travel, upserts, and deletion.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is an abstraction for structured and semi-structured data in Spark. It is a purely in-memory abstraction and does not directly manage storage or transactions.

Features

FeatureDeltaTableDataFrame
PersistenceStores data on disk in a managed format.Primarily in-memory abstraction (ephemeral).
Schema EnforcementEnforces schema when writing/updating.No schema enforcement unless explicitly specified.
ACID TransactionsSupports atomic writes, updates, and deletes.Not transactional; changes require reprocessing.
VersioningMaintains historical versions (time travel).No versioning; a snapshot of data.
Upserts and DeletesSupports MERGE, UPDATE, and DELETE.Does not directly support these operations.
PerformanceOptimized for storage (Z-order indexing, compaction).Optimized for in-memory transformations.
Time TravelQuery historical data using snapshots.No time travel support.
IndexingSupports indexing (Z-order, data skipping).No indexing capabilities.

Use Cases

DeltaTable

Ideal for persistent storage with advanced capabilities:

  • Data lakes or lakehouses.
  • ACID-compliant operations (e.g., MERGE, DELETE).
  • Time travel to access historical data.
  • Optimizing storage with compaction or Z-ordering.
  • Schema evolution during write operations.

DataFrame

Best for in-memory processing and transformations:

  • Ad-hoc queries and ETL pipelines.
  • Working with data from various sources (files, databases, APIs).
  • Temporary transformations before persisting into Delta or other formats.

Common APIs

DeltaTable

Load Delta table from a path:

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

Merge data:

delta_table.alias("target").merge( 
source_df.alias("source"), 
"target.id = source.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Time Travel:

df = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta/table")

Optimize

OPTIMIZE '/path/to/delta/table' ZORDER BY (column_name);

DataFrame

Read

df = spark.read.format("parquet").load("/path/to/data")

Transformations

transformed_df = df.filter(df.age > 30).groupBy("gender").count()

Write

df.write.format("delta").save("/path/to/save")

Transition Between DeltaTables and DataFrames

Convert DeltaTable to DataFrame:

df = delta_table.toDF()

Write DataFrame to Delta format:

df.write.format("delta").save("/path/to/delta/table")

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)

Summary of Dataframe Methods

Summary of Dataframe Methods

CategoryMethodExample
InspectionprintSchema()df.printSchema()
columnsdf.columns
Selectionselect()df.select("col1", "col2").show()
withColumn()df.withColumn("new_col", col("col1") + 1)
withColumnRenamed()df.withColumnRenamed("old", "new")
distinct()df.select(“cut”).distinct().show()
take(5) df.take(5) # Retrieve the first 5 rows.
drop( )df.drop(‘col’).show() #drop col
Filteringfilter()
where ( )
df.filter(df.col1 > 10).show()
df . where (df.col1 > 10) . show()
AggregationsgroupBy().agg()df.groupBy("col").agg(sum("val")).show()
count()df.count()
Joinsjoin()df1.join(df2, df1.id == df2.id, "inner")
Left Joindf1.join(df2, df1.id == df2.id, “left_outer”).show()
SortingorderBy()
sort( )
df.orderBy("col1").show()
df.sort(df.col1.desc()).show( )
Null Handlingdropna(), fillna()df.fillna({"col1": 0}).show()
isNotNull( )df.filter(col(‘cut’).isNotNull( )).show( )
isNull( )df.filter(col(‘cut’).isNull( )).show()
dropna()df.dropna(subset=[“col1”]).show()
Date/Timeyear(), month(), dayofmonth()df.withColumn("year", year("date_col"))
Writingwrite.format()df.write.csv(“path/to/csv”, header=True) df.write.json(“path/to/json”)
Save as Tablewrite.format().mode( ).saveAsTable( )df.write.format(“delta”).saveAsTable(“my_table”)
Create as ViewcreateOrReplaceTempView( )df.createOrReplaceTempView(“temp_view_name”)
createOrReplaceGlobalTempView( )df.createOrReplaceGlobalTempView(“global_temp_view_name”)
String Opsupper(), concat()df.withColumn("upper", upper("col"))
PartitioningpartitionBy(“col”)f.write.partitionBy(“department”).parquet(“output/parquet_data”)
repartition(4)df.repartition(4).show() # Repartition into 4 partitions.
coalesce(2)df.coalesce(2).show() # Reduce to 2 partitions.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account ๐Ÿ˜Š)