Introducing Fabric

What is Fabic?

Microsoft Fabric is an all-in-one, SaaS (Software as a Service) analytics platform. It combines data movement, data engineering, data science, and business intelligence into one single website. It is built on OneLake, which is like “OneDrive for data.”

Why use Microsoft Fabric?

Traditional data stacks are “fragmented”—you might use Azure Data Factory for moving data, Databricks for cleaning it, Datalake for storing it, and Power BI for seeing it.

Fabric fixes this by putting everything in one place.

  • Unified Data: Every tool (SQL, Spark, Power BI) uses the same copy of data in OneLake. No more duplicating data.
  • SaaS Simplicity: You don’t need to manage servers, clusters, or storage accounts. It’s all managed by Microsoft.
  • Direct Lake Mode: Power BI can read data directly from the lake without “importing” it, making reports incredibly fast.
  • Cost Efficiency: You pay for one pool of “Compute Capacity” and share it across all your teams.

What can Microsoft Fabric do?

It handles the entire data journey:

  • Ingest: Pull data from anywhere (SQL, AWS, Web).
  • Store: Store massive amounts of data in an open format (Delta Parquet).
  • Process: Clean and transform data using Python/Spark or SQL.
  • Analyze: Run complex queries and train Machine Learning models.
  • Visualize: Build real-time dashboards in Power BI.

Components of Fabric

Fabric is divided into “Experiences” based on what you need to do:

Component (Experience)RolePurpose
Data FactoryData EngineerETL, Pipelines, and Dataflows.
Synapse Data EngineeringSpark DeveloperHigh-scale processing using Notebooks.
Synapse Data WarehouseSQL DeveloperProfessional-grade SQL storage.
Synapse Data ScienceData ScientistBuilding AI and ML models.
Real-Time IntelligenceIoT / App DevHigh-speed streaming data.
Power BIBusiness AnalystVisualizing data for the business.
Data ActivatorOperationsAutomatic alerts (e.g., “Email me if sales drop”).

Step-by-Step: Getting Started

If you were a Data Engineer working on a project today, your workflow would look like this:

Step 1: Create a Workspace

  • Log in to Fabric. Create a Workspace.
  • Ensure it has a Fabric Capacity license attached.

Step 2: The Lakehouse (Bronze)

  • Create a Lakehouse called “Company_Data_Lake“.
  • This creates a folder in OneLake where your raw files will live.

Step 3: Data Pipeline (Ingestion

  • Use Data Factory to create a pipeline.
  • >> Data Pipeline | Source: SQL Server | Destination: Lakehouse Files | 03/2026
  • This pulls your raw data into the “Files” folder.

Step 4: Notebook (Silver/Gold)

  • Open a Notebook. Use PySpark to clean the data.
  • Example: Remove duplicates, fix date formats.
  • Save the result as a Table (Delta format).

Step 5: Power BI (Visualization)

  • Switch to your SQL Analytics Endpoint.
  • Click New Report. Power BI will use Direct Lake to show your data instantly.

Appendix

Microsoft Fabric fundamentals documentation

Technical Interview Questions and Answers

The Job Interview is the single most critical step in the job hunting process. It is the definitive arena where all of your abilities must integrate. The interview itself is not a skill you possess; it is the moment you deploy your Integrated Skill Set, blending:

  1. Hard Skills (Technical Mastery): Demonstrating not just knowledge of advanced topics, but the depth of your expertise and how you previously applied it to solve complex, real-world problems.
  2. Soft Skills (Communication & Presence): Clearly articulating strategy, managing complexity under pressure, and exhibiting the leadership presence expected of senior-level and expert-level candidates.
  3. Contextual Skills (Business Acumen): Framing your solutions within the company’s business goals and culture, showing that you understand the strategic impact of your work.

This Integrated Skill represents your first real opportunity to sell your strategic value to the employer.

What is Azure Data Factory?

Azure Data Factory is a cloud-based data integration service used to create data-driven workflows for orchestrating and automating data movement and transformation across different data stores and compute services.

Key capabilities

  • Data ingestion
  • Data orchestration
  • Data transformation
  • Scheduling and monitoring
What are the core components of ADF?

The main components are:

ComponentPurpose
PipelineLogical grouping of activities
ActivitySingle task in a pipeline
DatasetData structure pointing to data
Linked ServiceConnection to external resources
TriggerSchedule or event that starts a pipeline
Integration RuntimeCompute infrastructure used to run activities
What is a Pipeline?

A pipeline is a logical grouping of activities that together perform a task such as data ingestion or transformation.

What is Integration Runtime (IR)?

Integration Runtime is the compute infrastructure used by ADF to perform data integration tasks.

  • Azure IR Fully managed compute in Azure
  • Self-hosted IR Runs on on-premises machines
  • Azure SSIS IR Used to run SSIS packages

Types:

What is a Linked Service?

A linked service defines the connection information needed for ADF to connect to external resources, example:

  • Azure SQL Database
  • Data Lake
  • databricks
  • On-Prem database
What is a Dataset?

A dataset represents the structure of data within a data store.

Difference between Pipeline and Data Flow
FeaturePipelineData Flow
PurposeOrchestrationData transformation
ComputeOrchestration engineSpark cluster
UIActivity workflowVisual transformation
How do you handle incremental loading?

Solution 1: Watermark column
last_modified_date:
Max(Last_modified)
WHERE last_modified > last_run_time

Solution 2: CDC
transaction log
change tracking

Solution 3: File-based incremental
folder partition by date

How do you implement error handling?

Try-Catch pattern
Failure path
Retry policy

Activity
├ success → next step
└ failure → error handling pipeline

What are triggers in ADF?

Triggers are used to automatically start pipelines.

TriggerPurpose
Schedule triggerTime-based execution
Tumbling windowTime-based incremental
Event triggerStorage events
What is Tumbling Window Trigger?

A tumbling window trigger runs pipelines at fixed time intervals and processes data in discrete time windows.

How do you parameterize pipelines?

ADF supports parameters at:

  • Linked Service
  • plpeline
  • dataset
What are common ADF performance optimizations?

Parallel copy: pipeline success; pipeline duration; failure rate
Staging: use blob staging
Partitioning: split large tables

How do you monitor pipelines?

Monitoring options:
ADF Monitor UI
Azure Monitor
Log Analytics

Describe the data storage options available in Databricks.

Databricks offers several ways to store data. First, there’s the Databricks File System for storing and managing files. Then, there’s Delta Lake, an open-source storage layer that adds ACID transactions to Apache Spark, making it more reliable. Databricks also integrates with cloud storage services like AWS S3, Azure Blob Storage, and Google Cloud Storage. Plus, you can connect to a range of external databases, both relational and NoSQL, using JDBC.

What is Databricks Delta (Delta Lakehouse) and how does it enhance the capabilities of Azure Databricks?

Databricks Delta, now known as Delta Lake, is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It enhances Azure Databricks by providing features like:

  • ACID transactions for data reliability and consistency.
  • Scalable metadata handling for large tables.
  • Time travel for data versioning and historical data analysis.
  • Schema enforcement and evolution.
  • Improved performance with data skipping and Z-ordering
Are there any alternative solution that is similar to Delta lakehouse?

there are several alternative technologies that provide Delta Lake–style Lakehouse capabilities (ACID + schema enforcement + time travel + scalable storage + SQL engine). such as,

  • Apache Iceberg
  • Apache Hudi
  • Snowflake (Iceberg Tables / Unistore)
  • BigQuery + BigLake
  • AWS Redshift + Lake Formation + Apache Iceberg
  • Microsoft Fabric (OneLake + Delta/DQ/DLTS)
What is Delta Lake Table?

Delta lake tables are tables that store data in the delta format. Delta Lake is an extension to existing data lakes,

What is Delta Live Table?

Delta Live Tables (DLT) is a framework in Azure Databricks for building reliable, automated, and scalable data pipelines using Delta Lake tables.

It simplifies ETL development by managing data dependencies, orchestration, quality checks, and monitoring automatically.

Explain how you can use Databricks to implement a Medallion Architecture (Bronze, Silver, Gold).
  1. Bronze Layer (Raw Data): Ingest raw data from various sources into the Bronze layer. This data is stored as-is, without any transformation.
  2. Silver Layer (Cleaned Data, as known Enriched layer): Clean and enrich the data from the Bronze layer. Apply transformations, data cleansing, and filtering to create more refined datasets.
  3. Gold Layer (Aggregated Data, as known Curated layer): Aggregate and further transform the data from the Silver layer to create high-level business tables or machine learning features. This layer is used for analytics and reporting.
What Is Z-Order (Databricks / Delta Lake)?

Z-Ordering in Databricks (specifically for Delta Lake tables) is an optimization technique designed to co-locate related information into the same set of data files on disk.

OPTIMIZE mytable
ZORDER BY (col1, col2);
What Is Liquid Clustering (Databricks)?

Liquid Clustering is Databricks’ next-generation data layout optimization for Delta Lake.
It replaces (and is far superior to) Z-Order.

## At creation time:
CREATE TABLE sales
CLUSTER BY (customer_id, event_date)
AS SELECT * FROM source;

## For existing tables:
ALTER TABLE sales
CLUSTER BY (customer_id, event_date);

## Trigger the actual clustering:
OPTIMIZE sales;

## Remove clustering:
ALTER TABLE sales
CLUSTER BY NONE;
What is a Dataframe, RDD, Dataset in Azure Databricks?

Dataframe refers to a specified form of tables employed to store the data within Databricks during runtime. In this data structure, the data will be arranged into two-dimensional rows and columns to achieve better accessibility.

RDD, Resilient Distributed Dataset, is a fault-tolerant, immutable collection of elements partitioned across the nodes of a cluster. RDDs are the basic building blocks that power all of Spark’s computations.

Dataset is an extension of the DataFrame API that provides compile-time type safety and object-oriented programming benefits.

What is catching and its types?

A cache is a temporary storage that holds frequently accessed data, aiming to reduce latency and enhance speed. Caching involves the process of storing data in cache memory.

What is Spark Cache / Persist (Memory Cache)

This is the standard Apache Spark feature. It stores data in the JVM Heap (Memory).

What is Databricks Disk Cache (Local Disk)

This is a Databricks-specific optimization. It automatically stores copies of remote files (Parquet/Delta) on the local NVMe SSDs of the worker nodes.

comparing .cache() and .persist()

both .cache() and .persist() are used to save intermediate results to avoid re-computing the entire lineage. The fundamental difference is that .cache() is a specific, pre-configured version of .persist().

.cache(): This is a shorthand for .persist(StorageLevel.MEMORY_ONLY). It tries to store your data in the JVM heap as deserialized objects.

.persist(): This is the more flexible version. It allows you to pass a StorageLevel to decide exactly how and where the data should be stored (RAM, Disk, or both).

How do you optimize Databricks?

When optimizing Databricks workloads, I focus on several layers. First, I optimize data layout using partitioning and Z-ordering on Delta tables. Second, I improve Spark performance by using broadcast joins, filtering data early, and caching intermediate results. Third, I tune cluster resources such as autoscaling and Photon engine. Finally, I run Delta maintenance commands like OPTIMIZE and VACUUM to manage small files and improve query performance.

Data Layout Optimization:
Partitioning:
CREATE TABLE sales
USING DELTA
PARTITIONED BY (date)

Z-Ordering:
OPTIMIZE sales
ZORDER BY (customer_id);

Liquid Clustering:

What is Photon Engine?

Photon is a high-performance query engine built in C++ that accelerates SQL queries and data processing workloads in Azure Databricks. It improves performance by using vectorized processing and optimized execution for modern hardware.

How would you secure and manage secrets in Azure Databricks when connecting to external data sources?
  1. Use Azure Key Vault to store and manage secrets securely.
  2. Integrate Azure Key Vault with Azure Databricks using Databricks-backed or Azure-backed scopes.
  3. Access secrets in notebooks and jobs using the dbutils.secrets API.
    dbutils.secrets.get(scope=”<scope-name>”, key=”<key-name>”)
  4. Ensure that secret access policies are strictly controlled and audited.
Scenario: You need to implement a data governance strategy in Azure Databricks. What steps would you take?

  • Data Classification: Classify data based on sensitivity and compliance requirements.
  • Access Controls: Implement role-based access control (RBAC) using Azure Active Directory.
  • Data Lineage: Use tools like Databricks Lineage to track data transformations and movement.
  • Audit Logs: Enable and monitor audit logs to track access and changes to data.
  • Compliance Policies: Implement Azure Policies and Azure Purview for data governance and compliance monitoring.
Scenario: You need to optimize a Spark job that has a large number of shuffle operations causing performance issues. What techniques would you use?
  1. Repartitioning: Repartition the data to balance the workload across nodes and reduce skew.
  2. Broadcast Joins: Use broadcast joins for small datasets to avoid shuffle operations.
  3. Caching: Cache intermediate results to reduce the need for recomputation.
  4. Shuffle Partitions: Increase the number of shuffle partitions to distribute the workload more evenly.
  5. Skew Handling: Identify and handle skewed data by adding salt keys or custom partitioning strategies.
Scenario: You need to migrate an on-premises Hadoop workload to Azure Databricks. Describe your migration strategy.
  • Assessment: Evaluate the existing Hadoop workloads and identify components to be migrated.
  • Data Transfer: Use Azure Data Factory or Azure Databricks to transfer data from on-premises HDFS to ADLS.
  • Code Migration: Convert Hadoop jobs (e.g., MapReduce, Hive) to Spark jobs and test them in Databricks.
  • Optimization: Optimize the Spark jobs for performance and cost-efficiency.
  • Validation: Validate the migrated workloads to ensure they produce the same results as on-premises.
  • Deployment: Deploy the migrated workloads to production and monitor their performance.

What are Synapse SQL Workspaces and how are they used?

Synapse SQL Workspaces are the environments within Azure Synapse Analytics where users can perform data querying and management tasks. They include:

  • Provisioned SQL Pools: Used for large-scale, high-performance data warehousing. Users can create and manage databases, tables, and indexes, and run complex queries.
  • On-Demand SQL Pools: Allow users to query data directly from Azure Data Lake without creating a dedicated data warehouse. It is ideal for interactive and exploratory queries.
 What is the difference between On-Demand SQL Pool and Provisioned SQL Pool?

The primary difference between On-Demand SQL Pool and Provisioned SQL Pool lies in their usage and scalability:

  • On-Demand SQL Pool: Allows users to query data stored in Azure Data Lake without requiring a dedicated resource allocation. It is best for ad-hoc queries and does not incur costs when not in use. It scales automatically based on query demand.
  • Provisioned SQL Pool: Provides a dedicated set of resources for running data warehousing workloads. It is optimized for performance and can handle large-scale data operations. Costs are incurred based on the provisioned resources and are suitable for predictable, high-throughput workloads.
 How does Azure Synapse Analytics handle data integration?

Azure Synapse Analytics handles data integration through Synapse Pipelines, which is a data integration service built on Azure Data Factory. It enables users to:

  • Ingest Data: Extract data from various sources, including relational databases, non-relational data stores, and cloud-based services.
  • Transform Data: Use data flows and data wrangling to clean and transform data.
  • Orchestrate Workflows: Schedule and manage data workflows, including ETL (Extract, Transform, Load) processes.
  • Data Integration Runtime: Utilizes Azure Integration Runtime for data movement and transformation tasks.

Can you explain the concept of “Dedicated SQL Pool” in Azure Synapse Analytics?

Dedicated SQL Pool is a provisioned, high-performance relational database.

  • Data Storage: Data must be ingested and stored internally in a proprietary columnar format. It follows a Schema-on-Write approach.
  • Architecture: Uses MPP (Massively Parallel Processing) architecture. Data is sharded into 60 distributions and processed by multiple compute nodes in parallel.
  • Cost: Billed by the hour based on the provisioned DWUs. You can pause it when not in use to save costs.
  • Best For: Stable production reporting, TB/PB scale enterprise data warehousing, and high-concurrency queries needing sub-second response.
What is Serverless SQL Pool

Serverless SQL Pool is a An on-demand, compute-only query engine with no internal storage.

  • Data Location: It does not store data. Data remains in the Data Lake (ADLS Gen2) in open formats like Parquet, CSV, or JSON.
  • Mechanism: Uses the OPENROWSET function to query lake files directly. It follows a Schema-on-Read approach.
  • Cost: Billed per query based on data scanned (approx. $5 USD per TB). Cost is $0 if no queries are run.
  • Best For: Rapid data discovery, building a Logical Data Warehouse, and ad-hoc data validation.
What is a Synapse Spark Pool, and when would you use it?

It is a managed Apache Spark 3 instance easily created and configured within Azure.

  • Managed Cluster: You don’t manage servers; you just select the node size and the number of nodes.
  • Auto-Scale & Auto-Pause: It automatically scales nodes based on workload and pauses after 5 minutes of inactivity to save costs.
  • Language Support: Supports PySpark (Python), Spark SQL, Scala, and .NET.

Can you talk about database locker?

Database locking is the mechanism a database uses to control concurrent access to data so that transactions stay consistent, isolated, and safe.

Locking prevents:

  • Dirty reads
  • Lost updates
  • Write conflicts
  • Race conditions

Types of Locks:

1. Shared Lock (S)

  • Used when reading data
  • Multiple readers allowed
  • No writers allowed

2. Exclusive Lock (X)

  • Used when updating or inserting
  • No one else can read or write the locked item

3. Update Lock (U) (SQL Server specific)

  • Prevents deadlocks when upgrading from Shared → Exclusive
  • Only one Update lock allowed

4. Intention Locks (IS, IX, SIX)

Used at table or page level to signal a lower-level lock is coming.

5. Row / Page / Table Locks

Based on granularity:

  • Row-level: Most common, best concurrency
  • Page-level: Several rows together
  • Table-level: When scanning or modifying large portions

DB engines automatically escalate:

Row → Page → Table
when there are too many small locks.

Can you talk on Deadlock?

A deadlock happens when:

  • Transaction A holds Lock 1 and wants Lock 2
  • Transaction B holds Lock 2 and wants Lock 1

Both wait on each other → neither can move → database detects → kills one transaction (“deadlock victim”).

Deadlocks usually involve one writer + one writer, but can also involve readers depending on isolation level.

How to Troubleshoot Deadlocks?
A: In SQL Server: Enable Deadlock Graph Capture
run:
ALTER DATABASE [YourDB] SET DEADLOCK_PRIORITY NORMAL;

use:
DBCC TRACEON (1222, -1);
DBCC TRACEON (1204, -1);
B: Interpret the Deadlock Graph

You will see:

  • Processes (T1, T2…)
  • Resources (keys, pages, objects)
  • Types of locks (X, S, U, IX, etc.)
  • Which statement caused the deadlock

Look for:

  • Two queries touching the same index/rows in different order
  • A scanning query locking too many rows
  • Missed indexes
  • Query patterns that cause U → X lock upgrades
C. Identify
  • The exact tables/images involved
  • The order of locking
  • The hotspot row or range
  • Rows with heavy update/contention

This will tell you what to fix.

How to Prevent Deadlocks (Practical + Senior-Level)
  • Always update rows in the same order
  • Keep transactions short
  • Use appropriate indexes
  • Use the correct isolation level
  • Avoid long reads before writes

Can you discuss on database normalization and denormalization

Normalization is the process of structuring a relational database to minimize data redundancy (duplicate data) and improve data integrity.

Normal FormRule SummaryProblem Solved
1NF (First)Eliminate repeating groups; ensure all column values are atomic (indivisible).Multi-valued columns, non-unique rows.
2NF (Second)Be in 1NF, AND all non-key attributes must depend on the entire primary key.Partial Dependency (non-key attribute depends on part of a composite key).
3NF (Third)Be in 2NF, AND eliminate transitive dependency (non-key attribute depends on another non-key attribute).Redundancy due to indirect dependencies.
BCNF (Boyce-Codd)A stricter version of 3NF; every determinant (column that determines another column) must be a candidate key.Edge cases involving multiple candidate keys.

Denormalization is the process of intentionally introducing redundancy into a previously normalized database to improve read performance and simplify complex queries.

  • Adding Redundant Columns: Copying a value from one table to another (e.g., copying the CustomerName into the Orders table to avoid joining to the Customer table every time an order is viewed).
  • Creating Aggregate/Summary Tables: Storing pre-calculated totals, averages, or counts to avoid running expensive aggregate functions at query time (e.g., a table that stores the daily sales total).
  • Merging Tables: Combining two tables that are frequently joined into a single, wider table.

Markdown in a Databricks Notebook

Databricks Notebook Markdown is a special version of the Markdown language built directly into Databricks notebooks. It allows you to add richly formatted text, images, links, and even mathematical equations to your notebooks, turning them from just code scripts into interactive documents and reports.

Think of it as a way to provide context, explanation, and structure to your code cells, making your analysis reproducible and understandable by others (and your future self!).

Why is it Important?

Using Markdown cells effectively transforms your workflow:

  1. Documentation: Explain the purpose of the analysis, the meaning of a complex transformation, or the interpretation of a result.
  2. Structure: Create sections, headings, and tables of contents to organize long notebooks.
  3. Clarity: Add lists, tables, and links to data sources or external references.
  4. Communication: Share findings with non-technical stakeholders by narrating the story of your data directly alongside the code that generated it.

Key Features and Syntax with Examples

1. Headers (for Structure)

Use # to create different levels of headings.

%md
# Title (H1)
## Section 1 (H2)
### Subsection 1.1 (H3)
#### This is a H4 Header

Title (H1)

Section 1 (H2)

Subsection 1.1 (H3)

This is a H4 Header

2. Emphasis (Bold and Italic)

%md
*This text will be italic*
_This will also be italic_

**This text will be bold**
__This will also be bold__

**_You can combine them_**

This text will be italic This will also be italic

This text will be bold This will also be bold

You can combine them

3. Lists (Ordered and Unordered)

Unordered List:
%md
- Item 1
- Item 2
  - Sub-item 2.1
  - Sub-item 2.2
  • Item 1
  • Item 2
    • Sub-item 2.1
    • Sub-item 2.2
Ordered List:
%md
1. First item
2. Second item
   1. Sub-item 2.1
3. Third item
  1. First item
  2. Second item
    1. Sub-item 2.1
  3. Third item

4. Links and Images

link
%md
[Databricks Website](https://databricks.com)

Mainri Inc. webside

Image
%md
![mainri Inc. Logo](https://mainri.ca/wp-content/uploads/2024/08/Logo-15-trans.png)
mainri Inc. Logo

5. Tables

%md
| Column 1 Header | Column 2 Header | Column 3 Header |
|-----------------|-----------------|-----------------|
| Row 1, Col 1    | Row 1, Col 2    | Row 1, Col 3    |
| Row 2, Col 1    | Row 2, Col 2    | Row 2, Col 3    |
| *Italic Cell*   | **Bold Cell**   | Normal Cell     |
Column 1 HeaderColumn 2 HeaderColumn 3 Header
Row 1, Col 1Row 1, Col 2Row 1, Col 3
Row 2, Col 1Row 2, Col 2Row 2, Col 3
Italic CellBold CellNormal Cell

6. Code Syntax Highlighting (A Powerful Feature)

%md
```python
df = spark.read.table("samples.nyctaxi.trips")
display(df)
```

```sql
SELECT * FROM samples.nyctaxi.trips LIMIT 10;
```

```scala
val df = spark.table("samples.nyctaxi.trips")
display(df)
```

7. Mathematical Equations (LaTeX)

%md


$$
f(x) = \sum_{i=0}^{n} \frac{x^i}{i!}
$$

Summary

FeaturePurposeExample Syntax
HeadersCreate structure and sections## My Section
EmphasisAdd bold/italic emphasis**bold***italic*
ListsCreate bulleted or numbered lists- Item or 1. Item
TablesOrganize data in a grid| Header |
Links/ImagesAdd references and visuals[Text](URL)
Code BlocksDisplay syntax-highlighted codepython\ncode
Math (LaTeX)Render mathematical formulas$$E = mc^2$$

In essence, Databricks Notebook Markdown is the narrative glue that binds your code, data, and insights together, making your notebooks powerful tools for both analysis and communication.

PySpark Data sources

PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive

Query Database Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
# Query table using jdbc()
# Parameters
server_name = "myserver.database.windows.net"
port = "1433"
database_name = "mydatabase"
table_name = "mytable"
username = "myusername"
password = "mypassword"

# Construct the JDBC URL
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read data from the SQL database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
# show DataFrame
df_dep.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

Query JDBC Table Parallel, specific Columns

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from emp where sex='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()
Read & Write CSV File

Read CSV File

df = spark.read.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.load("/Folder/, /filePath2/")

Write CSV File

df = spark.write.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.mode("overwrite")
.save("/filePath/filename")

By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

we need take additional step – “Rename the File to name1.csv

# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")

# Filter for the part file
for file in files:
    if file.name.startswith("part-"):
        source_file = file.path  # Full path to the part file
        destination_file = "dbfs:/FileStore/name1.csv"
        
        # Move and rename the file
        dbutils.fs.mv(source_file, destination_file)
        break


display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name

PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

Steps:

  1. Use coalesce(1) to combine all data into a single partition.
  2. Save the file to a temporary location.
  3. Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/temp_folder")

# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
    if file.name.startswith("part-"):
        part_file = file.path
        break

# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")

# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Read & Write Parquet File

sample data save at /tmp/output/people.parquet

dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet

read from parquet

sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

read a parquet is the same as reading from csv, nothing special.

write to a parquet

  • append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.

    df.write.mode(“append”).parquet(“path/to/parquet/file”)
  • overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“overwrite”).parquet(“path/to/parquet/file”)
  • ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“ignore”).parquet(“path/to/parquet/file”)
  • Create Parquet partition file

    df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

Read & write Delta Table in PySpark

Read from a Delta Table

Read from a Registered Delta Table (in the Catalog)
df = spark.read.table("default.dim_dep")

df.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+
Read from a Delta Table Stored in a Directory (Path-Based)
df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_delta_table.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Write to a Delta Table

Append to a Delta Table
# Appends to the Delta table
df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")  
Overwrite a Delta Table
# Overwrites the Delta table
df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")  
Create or Overwrite a Registered Delta Table
# Overwrites the table in the catalog
df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep") 
Append to a Registered Delta Table:
# Appends to the table in the catalog
df.write.format("delta").mode("append").saveAsTable("default.dim_dep")  

merge operation (upsert)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Perform the merge operation
target_table.alias("t").merge(
    source_df.alias("s"),
    "t.id = s.id"  # Join condition: match rows based on `id`
).whenMatchedUpdate(
    set={
        "name": "s.name",  # Update `name` column
        "date": "s.date"   # Update `date` column
    }
).whenNotMatchedInsert(
    values={
        "id": "s.id",      # Insert `id`
        "name": "s.name",  # Insert `name`
        "date": "s.date"   # Insert `date`
    }
).execute()

# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
  1. Target Table (target_table):
    • The Delta table is loaded using DeltaTable.forPath.
    • This table contains existing data where updates or inserts will be applied.
  2. Source DataFrame (source_df):
    • This DataFrame contains new or updated records.
  3. Join Condition ("t.id = s.id"):
    • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
  4. whenMatchedUpdate:
    • If a matching row is found, update the name and date columns in the target table.
  5. whenNotMatchedInsert:
    • If no matching row is found, insert the new record from the source DataFrame into the target table.
  6. execute():
    • Executes the merge operation, applying updates and inserts.
  7. Result Verification:
    • After the merge, the updated Delta table is read and displayed.

Schema Evolution

df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
Read & Write JSON file

Read a simple JSON file

# Read a simple JSON file into dataframe
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

df_simple = spark.read.format("json") \
.option("multiline","True")\
.load("dbfs:/FileStore/simple_json.json")

df_simple.printSchema()
root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)

df_simple.show()
+------------+------------+-----+-----------+-------+
|        City|RecordNumber|State|ZipCodeType|Zipcode|
+------------+------------+-----+-----------+-------+
|BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+------------+------------+-----+-----------+-------+

pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error

df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
df_simple,show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).

Reading from Multiline JSON (JSON Array) File

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
df_jsonarray_simple = spark.read\
    .format("json")\
    .option("multiline", "true")\
    .load("dbfs:/FileStore/jsonArrary.json")

df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

read complex json

df_complexjson=spark.read\
    .option("multiline","true")\
    .json("dbfs:/FileStore/jsonArrary2.json")

df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0001                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Cake                                                                                                                                      
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}                                                           
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}] 
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0002                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Raised                                                                                                                                    
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}]}                                                                                                                       
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]                                                           
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0003                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Old Fashioned                                                                                                                             
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}]}                                                                                                    
 topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]                                                                          

Reading from Multiple files at a time

# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json','resources/zipcode2.json'])

Reading from Multiple files at a time

# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")

Reading files with a user-specified custom schema

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("resources/zipcodes.json")
df_with_schema.printSchema()

Reading File using PySpark SQL

spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Write to JSON

Options

  • path: Specifies the path where the JSON files will be saved.
  • mode: Specifies the behavior when writing to an existing directory.
  • compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”).
    df2.write . option(“compression”, “gzip”)
  • dateFormat: Specifies the format for date and timestamp columns.
    df.write . option(“dateFormat”, “yyyy-MM-dd”)
  • timestampFormat: Specifies the format for timestamp columns.
    df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
  • lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline)
    df . write . option(“lineSep”, “\r\n”)
  • encoding: Specifies the character encoding to use when writing the JSON files.
    UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
    df . write . option(“encoding”, “UTF-8”)
df2.write . format("json")
 . option("compression", "gzip") \
 . option("dateFormat", "yyyy-MM-dd") \ 
 . option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
 . option("encoding", "UTF-8") \
 . option("lineSep", "\r\n")
 . save("dbfs:/FileStore/output_json")

modes

  1. Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
  2. Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
  3. Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
  4. Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Read & Write SQL Server

Read from SQL Server

server_name = "mainri-sqldb.database.windows.net"
port=1433
username="my login name"
password="my login password"
database_name="mainri-sqldb"
table_name="dep"

jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

alternative way

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .load()

Select Specific Columns to Read

In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("databaseName",database_name) \
  .option("query", "select * from [dbo].[dep] where depid>3") \
  .option("user", username) \
  .option("password", password) \
  .load()

df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    4|human resource|
|    5|         admin|
+-----+--------------+

write to table

write mode:

Append:
mode("append") to append the rows to the existing SQL Server table.

# we have define variables, here is show again
server_name = “mainri-sqldb.database.windows.net”
port=1433
username=”my login name”
password=”my login password”
database_name=”mainri-sqldb”
table_name=”dep”
jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”

# append the rows to the existing SQL Server table.

df_newrow.show()
+-----+-----+
|depid|  dep|
+-----+-----+
|    5|admin|
+-----+-----+

df_newrow.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
|    5|         admin| <- new added row
+-----+--------------+

overwrite

The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.

df_newrow.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
Read & Write MySQL

Read from MySQL

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select Specific Columns to Read

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("numPartitions",4) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Write to MySQL

Some points to note while writing

  • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
  • To retain the indexes, use option("truncate","true").
  • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
  • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
Read JDBC in Parallel

PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. 

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

# Read Table in Parallel
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable","employee") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select columns with where clause

# Select columns with where clause
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Using fetchsize with numPartitions to Read

The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.

# Using fetchsize
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Read & Write delta table, Catalog, Hive Table

Read delta table, Catalog, Hive Table

# Read Hive table
df = spark.sql("select * from emp.employee")
df.show()
# Read Hive table
df = spark.read.table("employee")
df.show()
# Read delta table
df = spark.sql("select * from delta.` table path `  ")
df.show()

caution: ` Backticks 

Write / Save

To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

use SQL, temporary view

# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")

# Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

PySpark DataFrame

PySpark DataFrame is a distributed collection of rows, similar to a table in a relational database or a DataFrame in Python’s pandas library. It provides powerful tools for querying, transforming, and analyzing large-scale structured and semi-structured data.

PySpark apply functions

Apply a function to a column

df.withColumn("Upper_Name", upper(df.Name))
df.select("Seqno","Name", upper(df.Name))

df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB")

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(upperCase,StringType())
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB")
collect ( )

collect () is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
dataCollect = deptDF.collect()
print(dataCollect)
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
Column Class

Access column

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)
+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

Column Operators

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

df.select(df.col2 < df.col3).show()
+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

df.select(df.col2 == df.col3).show()
+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+
Convert DataFrame to Pandas

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

pandasDF = pysparkDF.toPandas()
Convert RDD to DataFrame
df = rdd.toDF()
Create an empty DataFrame

Create an empty DataFrame

#Create Schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()
dropDuplicates, distinct ()

key different between distinct() and dropDuplicates()

  • distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness.
  • distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct().
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |  #duplicated
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df.distinct().show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
Distinct count: 9
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

df2 = df.dropDuplicates(["department"])
print("Distinct count: "+str(df2.count()))
Distinct count: 3
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
+-------------+----------+------+
fillna() & fill()

DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values.

# Prepare Data
data = [("James", None, 3000), \
    ("Michael", "Sales", None), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", None), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", None, 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |null      |3000  |
|Michael      |Sales     |null  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |null  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |null      |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|      null|  3000|
|      Michael|     Sales|     0|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|     0|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|      null|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
#Replace 0 for null on only population column 
df.na.fill(value="unknown",subset=["department"]).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|   unknown|  3000|
|      Michael|     Sales|  null|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  null|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|   unknown|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
groupBy ( )

groupBy ( ), Similar to SQL GROUP BY clause,  transformation that is used to group rows that have the same values in specified columns into summary rows

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|     Sales|   NY|     176000|     30000|
|     Sales|   CA|      81000|     23000|
|   Finance|   CA|     189000|     47000|
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
| Marketing|   CA|      80000|     18000|
+----------+-----+-----------+----------+
join ( )
  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
  • Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows.
  • Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame.
  • Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame.
  • Self Join:
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
# Inner join
deptDF.join(empDF, deptDF.dept_id==empDF.emp_dept_id, "inner").show()
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
return: left-hand has, right-hand has too
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
return: left-hand has, but right-hand does not have.
# Self join
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
orderBy( ) and sort( )

orderBy() and sort() can be interchange each other

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
# Sorting different columns in different orders

df.sort("state", "age",ascending=[False,True]).show()
df.sort(df["state"].desc(), df["age"].asc()).show()
df.orderBy("state", "age",ascending=[False,True]).show()
df.orderBy(df["state"].desc(), df["age"].asc()).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+
partitionBy ( )

partitionBy ( )

pivot ( ) & Unpivot ( )

pivot() (Row to Column)

#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
# Output
root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

# Applying pivot()
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

# Output
root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

# one more example
pivotDF = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
root
 |-- Country: string (nullable = true)
 |-- Banana: long (nullable = true)
 |-- Beans: long (nullable = true)
 |-- Carrots: long (nullable = true)
 |-- Orange: long (nullable = true)

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|China  |400   |1500 |1200   |4000  |
|USA    |1000  |1600 |1500   |4000  |
|Mexico |null  |2000 |null   |null  |
|Canada |2000  |null |2000   |null  |
+-------+------+-----+-------+------+

Unpivot 

PySpark SQL doesn’t have unpivot function hence will use the stack() function. 

# Applying unpivot()
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+
sample(), sampleBy()

sample(), is a mechanism to get random sample records from the dataset

Syntax

sample(withReplacement, fraction, seed=None)

  • fraction – Fraction of rows to generate, range [0.0, 1.0]. Note that it doesn’t guarantee to provide the exact number of the fraction of records.
  • seed – Seed for sampling (default a random seed). Used to reproduce the same random sampling.
  • withReplacement – Sample with replacement or not (default False).
df=spark.range(100)
print(df.sample(0.06).collect())

#Output: [Row(id=0), Row(id=2), Row(id=17), Row(id=25), Row(id=26), Row(id=44), Row(id=80)]

Above example, my DataFrame has 100 records and I wanted to get 6% sample records which are 6 but the sample() function returned 7 records. This proves the sample function doesn’t return the exact fraction specified.

To get consistent same random sampling uses the same slice value for every run. Change slice value to get different results.

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,456).collect())
//Output: 19,21,42,48,49,50,75,80

sampleBy()

sampleBy(col, fractions, seed=None)

df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

//Output: [Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]
select ( )
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+
# Select columns by different ways
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

# By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Nested Struct Columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+
# Select child columns
df2.select("name.firstname","name.lastname").show(truncate=False)
+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+
# Select all child columns
df2.select("name.*").show(truncate=False)
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+
show ( )
df.show()
+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+
df.show(truncate=False)
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)
-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
only showing top 3 rows
StructType & StructField

StructType

Defines the structure of the DataFrame. StructType represents a schema, which is a collection of StructField objects. A StructType is essentially a list of fields, each with a name and data type, defining the structure of the DataFrame. It allows for the creation of nested structures and complex data types.

StructField

StructField – Defines the metadata of the DataFrame column. It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

nested StructType

# nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+
transform ( )

transform ( )

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# PySpark transform() Usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount)
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+
UDF

UDF (User Defined Function)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
#create a python function
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 
#Convert a Python function to PySpark UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

convertUDF = udf(lambda z: convertCase(z),StringType())
or 
convertUDF = udf(convertCase,StringType())
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+
union ( ) & unionAll ( )

union ( ) & unionAll ( ) are the same result. unionAll is older, retired


df1
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+
df2
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
df.union(df2).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|<--duplicated
|Maria        |Finance   |CA   |90000 |24 |23000|<--duplicated
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
unionByName ( )

df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2

df1
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+
df2
+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+
# different columns order
df3 = df1.unionByName(df2)
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+
# different columns name and order 
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+
df3=df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+
where() & filter()

where() & filter() can replace each other

  • Use &, |, ~ for logical operations (AND, OR, NOT).
  • Use ==, !=, >, <, >=, <= for comparisons.
  • Always wrap column references in col() for clarity.
  • For SQL-like patterns, consider using functions like like, isin, and between.
  • IS NULL –> “isNull ( )”
  • IS NOT NULL –> “isNotNull ( )”
  • LIKE –> “like ( %abc% )”
  • IN –> “isin (18, 21, 25)”
  • BETWEEN –> “between(18, 25)”
+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+
df.select("gender").filter(df.gender == "M").show()
df.select("gender").where(df.gender == "F").show()
+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     M|
+------+

+------+
|gender|
+------+
|     F|
|     F|
+------+
withColumn()
from pyspark.sql.functions import date_add, col
df.withColumn("dob", date_add("dob", 10)).\
withColumn("newsalary",col("salary")*100).\
drop("middlename").show()
+---------+--------+----------+------+------+---------+
|firstname|lastname|       dob|gender|salary|newsalary|
+---------+--------+----------+------+------+---------+
|    James|   Smith|1991-04-11|     M|   300|    30000|
|  Michael|        |2000-05-29|     M|   400|    40000|
|   Robert|Williams|1978-09-15|     M|   400|    40000|
|    Maria|   Jones|1967-12-11|     F|   400|    40000|
|      Jen|   Brown|1980-02-27|     F|    -1|     -100|
+---------+--------+----------+------+------+---------+
withColumnRenamed ( )

withColumnRenamed() rename a DataFrame column, we often need to rename one column or multiple (or all) columns on PySpark DataFrame

+--------------------+----------+------+------+
|                name|       dob|gender|salary|
+--------------------+----------+------+------+
|    {James, , Smith}|1991-04-01|     M|  3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|    -1|
+--------------------+----------+------+------+
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.show()
+--------------------+-----------+------+-------------+
|                name|DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|    {James, , Smith}| 1991-04-01|     M|         3000|
|   {Michael, Rose, }| 2000-05-19|     M|         4000|
|{Robert, , Williams}| 1978-09-05|     M|         4000|
|{Maria, Anne, Jones}| 1967-12-01|     F|         4000|
|  {Jen, Mary, Brown}| 1980-02-17|     F|           -1|
+--------------------+-----------+------+-------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

PySpark Built-in Functions

PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.

Visual Summary of Categories

CategoryFunctions
Basic Functionsalias, cast, lit, col, when, isnull, isnan
String Functionsconcat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap
Date and Time Functionscurrent_date, datediff, to_date, year, hour, unix_timestamp, date_format
Mathematical Functionsabs, round, floor, sqrt, pow, exp, log, sin, cos, rand
Aggregation Functionscount, sum, avg, min, max, stddev, collect_list
Array and Map Functionsarray, size, array_contains, explode, map_keys, map_values
Null Handling Functionsisnull, na.fill, na.drop, na.replace
Window Functionsrow_number, rank, ntile, lag, lead, cume_dist, percent_rank
Statistical Functionscorr, covar_samp, approx_count_distinct, percentile_approx
UDF and Advanced Functionsudf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json
sample DataFrames
Sample dataframe 
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime       |
+---+-----+-----------------------+
|1  |F    |2024-12-03 00:46:02.165|
|2  |E    |2024-12-03 00:46:02.165|
|3  |D    |2024-12-03 00:46:02.165|
|4  |G    |2024-12-03 00:46:02.165|
|6  |J    |2024-12-03 00:46:02.165|
|5  |I    |2024-12-03 00:46:02.165|
|7  |H    |2024-12-03 00:46:02.165|
|8  |K    |2024-12-03 00:46:02.165|
|9  |L    |2024-12-03 00:46:02.165|
+---+-----+-----------------------+

PySpark datetime related functions

PySpark provides a rich set of functions in the pyspark.sql.functions module to manipulate and analyze datetime columns.

date time Formatting

Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second

from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
          .show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime       |formatted_date|
+---+-----+-----------------------+--------------+
|1  |F    |2024-12-03 00:46:02.165|2024-12-03    |
|2  |E    |2024-12-03 00:46:02.165|2024-12-03    |
+---+-----+-----------------------+--------------+
Converting Between Types
  • to_date(column, format): Converts a string to a date.
  • unix_timestamp(column, format): Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp

dfc.withColumn("date_only", to_date("current_date")) \
    .withColumn("unix_time", unix_timestamp("current_date"))\          .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06    |2024-12-06|1733443200|
|2024-12-06    |2024-12-06|1733443200|
  • to_timestamp(column, format): Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.
  • from_unixtime(unix_time, format): Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit

df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
|    string|
+----------+
|2024-12-05|
+----------+

df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
|    string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+

df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
|    string|unixTimeStamp|
+----------+-------------+
|2024-12-05|   1733343200|
+----------+-------------+

df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
|   1733343200|                              2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
  • date_add ()
  • date_sub ()
  • add_month ()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()
+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )

PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.

from pyspark.sql.functions import col, current_date, datediff

df2 = df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    )
+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2024-12-06|    1985|
|2019-06-24|  2024-12-06|    1992|
|2019-08-24|  2024-12-06|    1931|
+----------+------------+--------+
months_between ( )

PySpark SQL months_between() function to get the number of months between two dates

from pyspark.sql.functions import col, current_date, datediff, months_between, round

df3 = df.withColumn("today", current_date())\
    .withColumn("monthsDiff", months_between(current_date(), col("date"))) \
    .withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id|      date|     today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
|  1|2019-07-01|2024-12-06|65.16129032|           65.16|
|  2|2019-06-24|2024-12-06|65.41935484|           65.42|
|  3|2019-08-24|2024-12-06|63.41935484|           63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years

utilize the months_between() function to get the difference in months and then convert it into years.

from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit

df4 = df.withColumn("today", current_date()) \
  .withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
  .withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))

+---+----------+----------+-----------------+---------------+
| id|      date|     today|        yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
|  1|2019-07-01|2024-12-06|5.430107526666667|           5.43|
|  2|2019-06-24|2024-12-06|5.451612903333333|           5.45|
|  3|2019-08-24|2024-12-06|5.284946236666666|           5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two

Calculates the difference between two

trunc ( )

trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01

df.select(col("input"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()
+----------+----------+-----------+
|     input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval

interval Used for advanced time calculations (not directly available but works with PySpark SQL).


Extracting Components from a Datetime
  • year(column): Extracts the year.
  • quarter(column): Returns the quarter as an integer from a given date or timestamp.
  • dayofyear(column): Extracts the day of the year from a given date or timestamp.
  • dayofmonth(column): Extracts the day of the month.
  • dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
  • weekofyear(column): Returns the week number of the year.
  • last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
  • next_day (column, day_of_week) e.g. Mon, Sunday
  • hour(column): Extracts the hour.
  • minute(column): Extracts the minute.
  • second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second

df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input                  |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1      |1    |2   |46    |2     |
|2023-01-11 15:35:32.265|2023|1      |1    |15  |35    |32    |
|2022-09-17 22:16:02.186|2022|3      |9    |22  |16    |2     |
+-----------------------+----+-------+-----+----+------+------+

from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format

df.select(date_format("input","yyy-MM-dd").alias("input"), 
    dayofweek("input").alias('dayofweek'), 
     dayofmonth("input").alias('dayofmonth'),
     weekofyear("input").alias("weekofyear") ,
     next_day("input","mon").alias("nextday"),
     last_day("input").alias('lastday')
  ).show()
+----------+---------+----------+----------+----------+----------+
|     input|dayofweek|dayofmonth|weekofyear|   nextday|   lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01|        2|         1|         1|2024-01-08|2024-01-31|
|2023-01-11|        4|        11|         2|2023-01-16|2023-01-31|
|2022-09-17|        7|        17|        37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
  • current_date (),
  • current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp

dfc.withColumn ("current_date", current_date())\
    .withColumn ("current_timestamp", current_timestamp())\
    .select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp      |
+------------+-----------------------+
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
+------------+-----------------------+

PySpark string related functions

btrim (str[, trim] ), trim ( )

btrim(str[, trim]) Trim characters at the beginning and end of the string ‘str’ are removed.

  • trim (): Removes only whitespace from the beginning and end of a string.
    when you only need to clean up whitespace from strings.
  • btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
    when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
|      text   |
+-------------+
|   hello     |
|  !!spark!!  |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
|      text|  trimmed|
+----------+---------+
|   hello  |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
|      text |trimmed_custom|
+-----------+--------------+
|   hello   |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()

concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.

  • concat ( ) : No delimiter is added between the concatenated values.
    when you need strict concatenation without any delimiters.
  • concat_ws (): with a specified delimiter between the values.
    when you need a delimiter or want to ignore NULL values.
from pyspark.sql.functions import concat, concat_ws, lit

df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|      null|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+

df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|     Alice|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )

endswith() Returns a boolean.

+--------------+
|          text|
+--------------+
|   hello world|
|PySpark is fun|
|       welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
|          text|end_with?|
+--------------+---------+
|   hello world|    false|
|PySpark is fun|     true|
|       welcome|    false|
+--------------+---------+

df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
|          text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )

contains () check whether a PySpark DataFrame column contains a specific string or not,

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
|     full_name|contain?|
+--------------+--------+
|      John Doe|   false|
|    Jane Smith|    true|
|Robert Johnson|   false|
+--------------+--------+

df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )

find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).

length ( )

length ( ) Provides the length of characters for string data or the number of bytes for binary data.

from pyspark.sql.functions import length

df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
|      text|char_length|
+----------+-----------+
|     hello|          5|
|   PySpark|          7|
|Databricks|         10|
+----------+-----------+
like ( )

like ( ) use && and || operators to have multiple conditions in Scala.

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
|          name|   R%|  %th|%John%|
+--------------+-----+-----+------+
|      John Doe|false|false|  true|
|    Jane Smith|false| true| false|
|Robert Johnson| true|false|  true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
|          name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )

substring (str, pos[, len]): Returns the substring of str that starts at pos and is of length len

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+
from pyspark.sql.functions import substring

df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
|    Ja|       Ja|
|    An|       An|
|    To|       To|
|    To|       To|
+------+---------+
split ( )

PySpark – split(), Splitting a column into multiple columns

from pyspark.sql.functions import split

df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
|     full_name|      split_names|
+--------------+-----------------+
|      John,Doe|      [John, Doe]|
|    Jane,Smith|    [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+

split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+

df_expanded = df_with_split.select(
    "full_name",
    df_with_split["split_names"].getItem(0).alias("first_name"),
    df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+
translate ( )

 translate() string function can replace character by character of DataFrame column value.

from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color|       new_color|        replaced|
+-----+----------------+----------------+
|    F|almost colorless|A_N0st c0_0r_ess|
|    E|            null|            null|
|    D|       colorless|       c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )

PySpark – regexp_replace() replace a column value with a string for another string/substring

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

from pyspark.sql.functions import regexp_replace

df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)
+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
   .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False
+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+
overlay

PySpark – overlay()

+---------------+----+
|           col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay

df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
|      overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
  • upper: Converts all characters in the column to uppercase.
  • lower: Converts all characters in the column to lowercase.
  • initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
|             text|
+-----------------+
|      hello world|
|spark sql example|
|  UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap

df.withColumn("Uppercase", upper("text"))\
   .withColumn("lowercase", lower("text"))\
    .withColumn("Capitalized", initcap("text"))\
    .show()
+-----------------+-----------------+-----------------+-----------------+
|             text|        Uppercase|        lowercase|      Capitalized|
+-----------------+-----------------+-----------------+-----------------+
|      hello world|      HELLO WORLD|      hello world|      Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
|  UPPER and LOWER|  UPPER AND LOWER|  upper and lower|  Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+

Numeric Functions

Mathematical operations on numeric columns.

abs()

abs(): Absolute value.

from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )

round(): Round to a specific number of decimals.

from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )

pow(): Power function.

from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))

Aggregate Functions

sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
  • approx_count_distinct (): returns the count of distinct items in a group
  • countdistinct: returns the count of distinct items in a group
  • distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct

df.select(approx_count_distinct("salary").alias("approx_count_distinct"), 
          countDistinct("salary").alias("countDistinct"),
          ).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
|                    6|            6|
+---------------------+-------------+

df.select("salary").distinct().show()
+------+
|salary|
+------+
|  3000|
|  4600|
|  4100|
|  3300|
|  3900|
|  2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
  • avg ( ): average of values in the input column
  • sum ( )
  • sumDistinct ( ): returns the sum of all distinct values in a column.
  • max ( )
  • min ( )
  • mean ( )
from pyspark.sql.functions import avg,sum, max, min

df.select(avg("salary").alias("avg"),
    sum("salary").alias("sum"),
    max("salary").alias("max"),
    min("salary").alias("min")
    ).show()
+------+-----+----+----+
|   avg|  sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  2|   20|
|  3|   30|
|  4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
|      20.0|
+----------+
first (), last ()
  • first() returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element.
  • last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last

df.select(first("salary").alias("first"),
         last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )

PySpark – collect_list() returns all values from an input column with duplicates.

from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )

PySpark – collect_set() returns all values from an input column without duplicates.

from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+

PySpark Window functions

PySpark’s Window Ranking functions, like row_number()rank(), and dense_rank(), assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.

  • row_number() assigns unique sequential numbers,
  • rank() provides the ranking with gaps,
  • dense_rank() offers ranking without gaps.
row_number ()

row_number() window function gives the sequential row number starting from 1 to the result of each window partition.

from pyspark.sql.window import Window
from pyspark.sql.functions import col,  row_number

windowSpec  = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
    .where(col("row_number") < 4).show()

+---+---------+-----+----------+
|_c0|      cut|color|row_number|
+---+---------+-----+----------+
|677|     Fair|    D|         1|
|772|     Fair|    D|         2|
|940|     Fair|    D|         3|
| 43|     Good|    D|         1|
| 44|     Good|    D|         2|
|239|     Good|    D|         3|
| 63|    Ideal|    D|         1|
| 64|    Ideal|    D|         2|
|121|    Ideal|    D|         3|
| 55|  Premium|    D|         1|
| 62|  Premium|    D|         2|
|151|  Premium|    D|         3|
| 29|Very Good|    D|         1|
| 35|Very Good|    D|         2|
| 39|Very Good|    D|         3|
+---+---------+-----+----------+
rank ()

rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

from pyspark.sql.functions import rank
from pyspark.sql.window import Window

windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()

+-----+---------+-----+-----+----+
|  _c0|      cut|color|price|rank|
+-----+---------+-----+-----+----+
|   29|Very Good|    D|  357|   1|
|28262|Very Good|    D|  357|   1|
|28272|     Good|    D|  361|   3|
|28273|Very Good|    D|  362|   4|
|28288|Very Good|    D|  367|   5|
|31598|    Ideal|    D|  367|   5|
|31601|  Premium|    D|  367|   5|
|31602|  Premium|    D|  367|   5|
|31618|Very Good|    D|  373|   9|
|34922|Very Good|    D|  373|   9|
|38277|  Premium|    D|  386|  11|
|38278|  Premium|    D|  386|  11|
|38279|  Premium|    D|  386|  11|
|38280|  Premium|    D|  386|  11|
|41581|Very Good|    D|  388|  15|
|41582|Very Good|    D|  388|  15|
dense_rank ()

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
|  _c0|color|price|dense_rank|
+-----+-----+-----+----------+
|   29|    D|  357|         1|
|28262|    D|  357|         1|
|28272|    D|  361|         2|
|28273|    D|  362|         3|
|28288|    D|  367|         4|
|31598|    D|  367|         4|
|31601|    D|  367|         4|
|31602|    D|  367|         4|
|31618|    D|  373|         5|
|34922|    D|  373|         5|
|38277|    D|  386|         6|
|38278|    D|  386|         6|
|38279|    D|  386|         6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0  |color|price|percent_rank         |
+-----+-----+-----+---------------------+
|29   |D    |357  |0.0                  |
|28262|D    |357  |0.0                  |
|28272|D    |361  |2.952465308532625E-4 |
|28273|D    |362  |4.428697962798937E-4 |
|28288|D    |367  |5.90493061706525E-4  |
|31598|D    |367  |5.90493061706525E-4  |
|31601|D    |367  |5.90493061706525E-4  |
|31602|D    |367  |5.90493061706525E-4  |
|31618|D    |373  |0.00118098612341305  |
|34922|D    |373  |0.00118098612341305  |
|38277|D    |386  |0.0014762326542663124|
|38278|D    |386  |0.0014762326542663124|
lag (), lead ( )
  • lag ( ) function allows you to access a previous row’s value within the partition based on a specified offset.
  • lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
  .withColumn("lead",lead("salary",2).over(windowSpec))\
  .show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
|        Maria|   Finance|  3000|null|3900|
|        Scott|   Finance|  3300|null|null|
|          Jen|   Finance|  3900|3000|null|
|        Kumar| Marketing|  2000|null|null|
|         Jeff| Marketing|  3000|null|null|
|        James|     Sales|  3000|null|4100|
|        James|     Sales|  3000|null|4100|
|       Robert|     Sales|  4100|3000|4600|
|         Saif|     Sales|  4100|3000|null|
|      Michael|     Sales|  4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )

ntile ( ) returns the relative rank of result rows within a window partition

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

PySpark json related functions

sample data
+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )

The explode() function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.

from pyspark.sql.functions import explode
explode(col)

col: The name of the column or an expression containing an array or map to be exploded.

Return a new row for each element in the array or each key-value pair in the map.

# Usage with Arrays:
# Sample data
data = [
    (1, ["a", "b", "c"]),
    (2, ["d", "e"]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
|  1|     a|
|  1|     b|
|  1|     c|
|  2|     d|
|  2|     e|
+---+------+

# Usage with Maps:
# Sample data
data = [
    (1, {"key1": "value1", "key2": "value2"}),
    (2, {"key3": "value3"}),
    (3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])

# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key|  value|
+---+----+-------+
|  1|key1| value1|
|  1|key2| value2|
|  2|key3| value3|
+---+----+-------+

work with json

Exploding a JSON Array

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "values": ["a", "b", "c"]}',),
    ('{"id": 2, "values": ["d", "e"]}',),
    ('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data                           |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]}     |
|{"id": 3, "values": []}             |
+------------------------------------+

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values   |
+---+---------+
|1  |[a, b, c]|
|2  |[d, e]   |
|3  |[]       |
+---+---------+

# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    c|
|  2|    d|
|  2|    e|
+---+-----+

Exploding a JSON Map

from pyspark.sql.types import MapType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
    ('{"id": 2, "properties": {"key3": "value3"}}',),
    ('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data                                                    |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}}                  |
|{"id": 3, "properties": {}}                                  |
+-------------------------------------------------------------+

# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties                      |
+---+--------------------------------+
|1  |{key1 -> value1, key2 -> value2}|
|2  |{key3 -> value3}                |
|3  |{}                              |
+---+--------------------------------+

# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|key1|value1|
|  1|key2|value2|
|  2|key3|value3|
+---+----+------+
  • Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the explode operation.
  • Complex JSON Structures: For deeply nested JSON structures, use nested from_json and explode calls as needed.
from_json ( )

from_json ( ): Converts JSON string into Struct type or Map type.

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object

get_json_object () Extracts JSON element from a JSON string based on json path specified.

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+
json_tuple ( )

json_tuple() Extract the Data from JSON and create them as a new columns.

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )

to_json ()  is used to convert DataFrame columns MapType or Struct type to JSON string

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )

schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.

pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) → Column

from pyspark.sql.functions import schema_of_json, col

# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'), 
        ("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]

df = spark.createDataFrame(data, columns)

# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]


struct<name:string,age:int,skills:array<string>>

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.

expr ()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

#Add Month value from another column
df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,

lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|  111| 50000|         1|
|  222| 60000|         1|
|  333| 40000|         1|
+-----+------+----------+
typedLit ()

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame

df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)

Difference between lit() and typedLit() is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()


Stack ( )

Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.

Syntax

stack(n: Int, exprs: String*): Column

Parameters

  • n: The number of rows to create per input row. Each set of n expressions in exprs corresponds to a new row.
  • exprs: A sequence of column-value pairs, typically specified as strings in the format "column_name, column_value".
+---+---+---+---+
| id|  A|  B|  C|
+---+---+---+---+
|  1|100|200|300|
|  2|400|500|600|
+---+---+---+---+

# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
    "id",
    "stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|       A|  100|
|  1|       B|  200|
|  1|       C|  300|
|  2|       A|  400|
|  2|       B|  500|
|  2|       C|  600|
+---+--------+-----+

The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns.  StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.

StructType & StructField

Simple STructType and StructField

# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark . createDataFrame(data=data, schema=schema)

Nested StructType object struct

# Defining schema using nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

When ()

When () is similar to SQL and programming languages.

from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
   when(dfc.color == "F", "almost colorless")
  . when(dfc.color == "D", "colorless")
  . when(dfc.color == "G", "indistinguishable colorless")
  . when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
  .otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color                  |
+-----+---------------------------+
|F    |almost colorless           |
|E    |very colorful              |
|D    |colorless                  |
|G    |indistinguishable colorless|
|J    |almost colorless           |
|I    |almost colorless           |
|H    |very colorful              |
|K    |very colorful              |
|L    |very colorful              |
+-----+---------------------------+

df3 = df.withColumn("new_gender", expr(
     "CASE WHEN gender = 'M' THEN 'Male' " + 
          "WHEN gender = 'F' THEN 'Female' 
           WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, 
     CASE WHEN gender = 'M' THEN 'Male' " + 
         "WHEN gender = 'F' THEN 'Female' 
          WHEN gender IS NULL THEN ''" +
         "ELSE gender END as new_gender from EMP").show()

attention: above 2 example segments plus one by one

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

deltaTable vs DataFrames

In Databricks and PySpark, DeltaTables and DataFrames both handle structured data but differ in functionality and use cases. Here’s a detailed comparison:

Definitions

DeltaTable

A DeltaTable is a storage format based on Apache Parquet, with support for ACID transactions, versioning, schema enforcement, and advanced file operations. It is managed by the Delta Lake protocol, offering features like time travel, upserts, and deletion.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is an abstraction for structured and semi-structured data in Spark. It is a purely in-memory abstraction and does not directly manage storage or transactions.

Features

FeatureDeltaTableDataFrame
PersistenceStores data on disk in a managed format.Primarily in-memory abstraction (ephemeral).
Schema EnforcementEnforces schema when writing/updating.No schema enforcement unless explicitly specified.
ACID TransactionsSupports atomic writes, updates, and deletes.Not transactional; changes require reprocessing.
VersioningMaintains historical versions (time travel).No versioning; a snapshot of data.
Upserts and DeletesSupports MERGE, UPDATE, and DELETE.Does not directly support these operations.
PerformanceOptimized for storage (Z-order indexing, compaction).Optimized for in-memory transformations.
Time TravelQuery historical data using snapshots.No time travel support.
IndexingSupports indexing (Z-order, data skipping).No indexing capabilities.

Use Cases

DeltaTable

Ideal for persistent storage with advanced capabilities:

  • Data lakes or lakehouses.
  • ACID-compliant operations (e.g., MERGE, DELETE).
  • Time travel to access historical data.
  • Optimizing storage with compaction or Z-ordering.
  • Schema evolution during write operations.

DataFrame

Best for in-memory processing and transformations:

  • Ad-hoc queries and ETL pipelines.
  • Working with data from various sources (files, databases, APIs).
  • Temporary transformations before persisting into Delta or other formats.

Common APIs

DeltaTable

Load Delta table from a path:

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

Merge data:

delta_table.alias("target").merge( 
source_df.alias("source"), 
"target.id = source.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Time Travel:

df = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta/table")

Optimize

OPTIMIZE '/path/to/delta/table' ZORDER BY (column_name);

DataFrame

Read

df = spark.read.format("parquet").load("/path/to/data")

Transformations

transformed_df = df.filter(df.age > 30).groupBy("gender").count()

Write

df.write.format("delta").save("/path/to/save")

Transition Between DeltaTables and DataFrames

Convert DeltaTable to DataFrame:

df = delta_table.toDF()

Write DataFrame to Delta format:

df.write.format("delta").save("/path/to/delta/table")

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Overview of Commonly Used Unity Catalog and Spark SQL Management Commands

Summary of frequently use Unity Catalog and Spark SQL management commands, organized in a table.

CategoryCommandDescriptionExample
Catalog ManagementSHOW CATALOGSLists all available catalogs.SHOW CATALOGS;
Schema ManagementSHOW SCHEMAS IN <catalog_name>Lists schemas (databases) within a catalog.SHOW SCHEMAS IN main;
DESCRIBE SCHEMA <catalog_name>.<schema_name>Provides metadata about a specific schema.DESCRIBE SCHEMA main.default;
Table ManagementSHOW TABLES IN <catalog_name>.<schema_name>Lists all tables in a schema.SHOW TABLES IN main.default;
DESCRIBE TABLE <catalog_name>.<schema_name>.<table_name>Displays metadata about a specific table.DESCRIBE TABLE main.default.sales_data;
SHOW PARTITIONS <catalog_name>.<schema_name>.<table_name>Lists partitions of a partitioned table.SHOW PARTITIONS main.default.sales_data;
SHOW COLUMNS IN <catalog_name>.<schema_name>.<table_name>Lists all columns of a table, including their data types.SHOW COLUMNS IN main.default.sales_data;
DROP TABLE <catalog_name>.<schema_name>.<table_name>Deletes a table from the catalog.DROP TABLE main.default.sales_data;
Database ManagementSHOW DATABASESLists all databases (schemas) in the environment.SHOW DATABASES;
DESCRIBE DATABASE <database_name>Provides metadata about a specific database.DESCRIBE DATABASE default;
Data QueryingSELECT * FROM <catalog_name>.<schema_name>.<table_name>Queries data from a table.SELECT * FROM main.default.sales_data WHERE region = 'West';
Table CreationCREATE TABLE <catalog_name>.<schema_name>.<table_name> (<columns>)Creates a managed table in Unity Catalog.CREATE TABLE main.default.sales_data (id INT, region STRING, amount DOUBLE);

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

arrayType, mapType column and functions

In PySpark, ArrayType and MapType are used to define complex data structures within a DataFrame schema.

ArrayType column, and functions,

ArrayType allows you to store and work with arrays, which can hold multiple values of the same data type.

sample dataframe:
id, numbers|
1, [1, 2, 3]
2, [4, 5, 6]
3, [7, 8, 9]

explode ()

“explode” a given array into individual new rows using the explode function, Offen use it to flatten JSON.

from pyspark.sql.functions import explode

# Explode the 'numbers' array into separate rows
exploded_df = df.withColumn("number", explode(df.numbers))
display(explode_df)
==output==
id	numbers	number
1	[1,2,3]	1
1	[1,2,3]	2
1	[1,2,3]	3
2	[4,5,6]	4
2	[4,5,6]	5
2	[4,5,6]	6
3	[7,8,9]	7
3	[7,8,9]	8
3	[7,8,9]	9
split ()

Split strings based on a specified delimiter, return a array type.

from pyspark.sql.functions import split
df.withColumn(“Name_Split”, split(df[“Name”], “,”))

sample dataframe
+————–+
| Name |
+————–+
| John,Doe |
| Jane,Smith |
| Alice,Cooper |
+————–+

from pyspark.sql.functions import split
# Split the 'Name' column by comma
df_split = df.withColumn("Name_Split", split(df["Name"], ","))

==output==
+-------------+----------------+
| Name        | Name_Split     |
+-------------+----------------+
| John,Doe    | [John, Doe]    |
| Jane,Smith  | [Jane, Smith]  |
| Alice,Cooper| [Alice, Cooper]|
+-------------+----------------+
array ()

Creates an array column.

from pyspark.sql.functions import array, col
data=[(1,2,3),(4,5,6)]
schema=['num1','num2','num3']
df1=spark.createDataFrame(data,schema)
df1.show()
# create a new column - numbers, array type. elements use num1,num2,num3   
df1.withColumn("numbers",array(col("num1"),col("num2"),col("num3"))).show()

==output==
+----+----+----+
|num1|num2|num3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

#new array column "numbers" created
+----+----+----+-----------+
|num1|num2|num3| numbers   |
+----+----+----+-----------+
|   1|   2|   3| [1, 2, 3] |
|   4|   5|   6| [4, 5, 6] |
+----+----+----+-----------+
array_contains ()

Checks if an array contains a specific element.

from pyspark.sql.functions import array_contains
array_contains(array, value)

sample dataframe
+—+———————–+
|id |fruits |
+—+———————–+
|1 |[apple, banana, cherry]|
|2 |[orange, apple, grape] |
|3 |[pear, peach, plum] |
+—+———————–+

from pyspark.sql.functions import array_contains

# Using array_contains to check if the array contains 'apple'
df.select("id", array_contains("fruits", "apple").alias("has_apple")).show()

==output==
+---+----------+
| id|has_apple |
+---+----------+
|  1|      true|
|  2|      true|
|  3|     false|
+---+----------+
getItem()

Access individual elements of an array by their index using the getItem() method

# Select the second element (index start from 0) of the 'numbers' array
df1 = df.withColumn("item_1_value",   df.numbers.getItem(1))
display(df1)
==output==
id	numbers	      item_1_value
1	[1,2,3]	       2
2	[4,5,6]	       5
3	[7,8,9]	       8
size ()

Returns the size of the array.

from pyspark.sql.functions import size

# Get the size of the 'numbers' array
df.select(size(df.numbers)).show()

==output==
+-------------+
|size(numbers)|
+-------------+
|            3|
|            3|
|            3|
+-------------+
sort_array()

Sorts the array elements.

sort_array(col: ‘ColumnOrName’, asc: bool = True)

If `asc` is True (default) then ascending and if False then descending. if asc=True, can be omitted.

from pyspark.sql.functions import sort_array
df.withColumn("numbers", sort_array("numbers")).show()
==output==
ascending 
+---+---------+
| id|  numbers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[4, 5, 6]|
|  3|[7, 8, 9]|
+---+---------+
df.select(sort_array("numbers", asc=False).alias("sorted_desc")).show()
==output==
descending 
+-----------+
|sorted_desc|
+-----------+
|  [3, 2, 1]|
|  [6, 5, 4]|
|  [9, 8, 7]|
+-----------+
concat ()

concat() is used to concatenate arrays (or strings) into a single array (or string). When dealing with ArrayType, concat() is typically used to combine two or more arrays into one.

from pyspark.sql.functions import concat
concat(*cols)

sample DataFrames
+—+——+——+
|id |array1|array2|
+—+——+——+
|1 | [a, b] | [x, y]|
|2 | [c] | [z] |
|3 | [d, e] | null |
+—+——-+——+

from pyspark.sql.functions import concat

# Concatenating array columns
df_concat = df.withColumn("concatenated_array", concat(col("array1"), col("array2")))
df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |null              |
+---+------+------+------------------+

Handling null Values

If any of the input columns are null, the entire result can become null. This is why you’re seeing null instead of just the non-null array.

To handle this, you can use coalesce() to substitute null with an empty array before performing the concat(). coalesce() returns the first non-null argument. Here’s how you can modify your code:

from pyspark.sql.functions import concat, coalesce, lit

# Define an empty array for the same type
empty_array = array()

# Concatenate with null handling using coalesce
df_concat = df.withColumn(
    "concatenated_array",
    concat(coalesce(col("array1"), empty_array), coalesce(col("array2"), empty_array))
)

df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |[d, e]            |
+---+------+------+------------------+
array_zip ()

Combines arrays into a single array of structs.


☰ MapType column, and functions

MapType is used to represent map key-value pair similar to python Dictionary (Dic)

from pyspark.sql.types import MapType, StringType, IntegerType
# Define a MapType
my_map = MapType(StringType(), IntegerType(), valueContainsNull=True)

Parameters:

  • keyType: Data type of the keys in the map. You can use PySpark data types like StringType(), IntegerType(), DoubleType(), etc.
  • valueType: Data type of the values in the map. It can be any valid PySpark data type
  • valueContainsNull: Boolean flag (optional). It indicates whether null values are allowed in the map. Default is True.

sample dataset
# Sample dataset (Product ID and prices in various currencies)
data = [
(1, {“USD”: 100, “EUR”: 85, “GBP”: 75}),
(2, {“USD”: 150, “EUR”: 130, “GBP”: 110}),
(3, {“USD”: 200, “EUR”: 170, “GBP”: 150}),
]


sample dataframe
+———-+————————————+
|product_id|prices |
+———-+————————————+
|1 |{EUR -> 85, GBP -> 75, USD -> 100} |
|2 |{EUR -> 130, GBP -> 110, USD -> 150}|
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+———-+————————————+

Accessing map_keys (), map_values ()

Extract keys (currency codes) and values (prices):

from pyspark.sql.functions import col, map_keys, map_values
# Extract map keys and values
df.select(
    col("product_id"),
    map_keys(col("prices")).alias("currencies"),
    map_values(col("prices")).alias("prices_in_currencies")
).show(truncate=False)

==output==
+----------+---------------+--------------------+
|product_id|currencies     |prices_in_currencies|
+----------+---------------+--------------------+
|1         |[EUR, GBP, USD]|[85, 75, 100]       |
|2         |[EUR, GBP, USD]|[130, 110, 150]     |
|3         |[EUR, GBP, USD]|[170, 150, 200]     |
+----------+---------------+--------------------+
exploder ()

Use explode () to flatten the map into multiple rows, where each key-value pair from the map becomes a separate row.

from pyspark.sql.functions import explode
# Use explode to flatten the map
df_exploded = df.select("product_id", explode("prices").alias("currency", "price")).show()

==output==
+----------+--------+-----+
|product_id|currency|price|
+----------+--------+-----+
|         1|     EUR|   85|
|         1|     GBP|   75|
|         1|     USD|  100|
|         2|     EUR|  130|
|         2|     GBP|  110|
|         2|     USD|  150|
|         3|     EUR|  170|
|         3|     GBP|  150|
|         3|     USD|  200|
+----------+--------+-----+
Accessing specific elements in the map

To get the price for a specific currency (e.g., USD) for each product:

from pyspark.sql.functions import col, map_keys, map_values
# Access the value for a specific key in the map 
df.select(
    col("product_id"),
    col("prices").getItem("USD").alias("price_in_usd")
).show(truncate=False)

==output==
+----------+------------+
|product_id|price_in_usd|
+----------+------------+
|1         |100         |
|2         |150         |
|3         |200         |
+----------+------------+
filtering

filter the rows based on conditions involving the map values

from pyspark.sql.functions import col, map_keys, map_values
# Filter rows where price in USD is greater than 150
df.filter(col("prices").getItem("USD") > 150).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
map_concat ()

Combines two or more map columns by merging their key-value pairs.

from pyspark.sql.functions import map_concat, create_map, lit

# Define the additional currency as a new map using create_map()
additional_currency = create_map(lit("CAD"), lit(120))

# Add a new currency (e.g., CAD) with a fixed price to all rows
df.withColumn(
    "updated_prices",
    map_concat(col("prices"), additional_currency)
).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

withColumnRenamed(), drop(), show()

withColumnRenamed ()

withColumnRenamed(), Rename an existing column in a DataFrame

DataFrame.withColumnRenamed(existingName, newName)

existingName: The current name of the column you want to rename.
newName: The new name for the column.

drop ()

In PySpark, you can drop columns from a DataFrame using the drop() method. Here’s a breakdown of the syntax, options, parameters, and examples for dropping columns in PySpark.

Syntax

DataFrame.drop(*cols)

*cols: One or more column names (as strings) that you want to drop from the DataFrame. You can pass these as individual arguments or a list of column names.

# Drop single column 'age'
df_dropped = df.drop("age")

# Drop multiple columns 'id' and 'age'
df_dropped_multiple = df.drop("id", "age")

# Dropping Columns Using a List of Column Names
# Define columns to drop
columns_to_drop = ["id", "age"]

# Drop columns using list
df_dropped_list = df.drop(*columns_to_drop)
df_dropped_list.show()
Show()

By default, display 20 row, truncate 20 characters

df.show(n=10, truncate= True, vertical=True)

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)