Logging in Azure Databricks with Python

In Azure Databricks, logging is crucial for monitoring, debugging, and auditing your notebooks, jobs, and applications. Since Databricks runs on a distributed architecture and utilizes standard Python, you can use familiar Python logging tools, along with features specific to the Databricks environment like Spark logging and MLflow tracking.

Python’s logging module provides a versatile logging system for messages of different severity levels and controls their presentation. To get started with the logging module, you need to import it to your program first, as shown below:

import logging

logging.debug("A debug message")
logging.info("An info message")
logging.warning("A warning message")
logging.error("An error message")
logging.critical("A critical message")

log levels in Python

Log levels define the severity of the event that is being logged. For example a message logged at the INFO level indicates a normal and expected event, while one that is logged at the ERROR level signifies that some unexpected error has occurred.

Each log level in Python is associated with a number (from 10 to 50) and has a corresponding module-level method in the logging module as demonstrated in the previous example. The available log levels in the logging module are listed below in increasing order of severity:

Logging Level Quick Reference

LevelMeaningWhen to Use
DEBUG (10)detailed debugging infodevelopment
INFO (20)normal messagesjob progress
WARNING (30)minor issuenon-critical issues
ERROR (40)failurerecoverable errors
CRITICAL (50)system failurestop the job

It’s important to always use the most appropriate log level so that you can quickly find the information you need. For instance, logging a message at the WARNING level will help you find potential problems that need to be investigated, while logging at the ERROR or CRITICAL level helps you discover problems that need to be rectified immediately.

By default, the logging module will only produce records for events that have been logged at a severity level of WARNING and above.

Logging basic configuration

Ensure to place the call to logging.basicConfig() before any methods such as info()warning(), and others are used. It should also be called once as it is a one-off configuration facility. If called multiple times, only the first one will have an effect.

logging.basicConfig() example

import logging
from datetime import datetime

## set date time format
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")

## Output WITH writing to a file (Databricks log file)
log_file = f"/dbfs/tmp/my_pipeline/logs/run_{run_id}.log"

## start configuing
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s — %(levelname)s — %(message)s",
)

## creates (or retrieves) a named logger that you will use to write log messages.
logger = logging.getLogger("pipeline")
## The string "pipeline" is just a name for the logger. It can be  anything:
## "etl"
## "my_app"
## "sales_job"
## "abc123"

simple logging example

import logging
from datetime import datetime

run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = f"/dbfs/tmp/my_pipeline/logs/run_{run_id}.log"

logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s — %(levelname)s — %(message)s",
)
logger = logging.getLogger("pipeline")

logger.info("=== Pipeline Started ===")

try:
    logger.info("Step 1: Read data")
    df = spark.read.csv(...)
    
    logger.info("Step 2: Transform")
    df2 = df.filter(...)
    
    logger.info("Step 3: Write output")
    df2.write.format("delta").save(...)

    logger.info("=== Pipeline Completed Successfully ===")

except Exception as e:
    logger.error(f"Pipeline Failed: {e}")
    raise

the log output looks this:

2025-11-21 15:05:01,112 – INFO – === Pipeline Started ===
2025-11-21 15:05:01,213 – INFO – Step 1: Read data
2025-11-21 15:05:01,315 – INFO – Step 2: Transform
2025-11-21 15:05:01,417 – INFO – Step 3: Write output
2025-11-21 15:05:01,519 – INFO – === Pipeline Completed Successfully ===

Log Rotation (Daily Files)

Log Rotation means: Your log file does NOT grow forever. Instead, it automatically creates a new log file each day (or each hour, week, etc.), and keeps only a certain number of old files.

This prevents:

  • huge log files
  • storage overflow
  • long-term disk growth
  • difficult debugging
  • slow I/O

It is very common in production systems (Databricks, Linux, App Servers, Databases). Without log rotation → 1 file becomes huge, With daily rotation:

my_log.log (today)
my_log.log.2025-11-24 (yesterday)
my_log.log.2025-11-23
my_log.log.2025-11-22

Python code that does Log Rotation

import logging
from logging.handlers import TimedRotatingFileHandler

handler = TimedRotatingFileHandler(
    "/dbfs/Volumes/logs/my_log.log",
    when="midnight",   # rotate every day
    interval=1,        # 1 day
    backupCount=30     # keep last 30 days
)

formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger = logging.getLogger("rotating_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)

logger.info("Log rotation enabled")
Hourly Log Rotation
handler = TimedRotatingFileHandler(
    "/dbfs/tmp/mylogs/hourly.log",
    when="H",       # rotate every hour
    interval=1,
    backupCount=24  # keep 24 hours
)

Size-Based Log Rotation
handler = RotatingFileHandler(
    "/dbfs/tmp/mylogs/size.log",
    maxBytes=5 * 1024 * 1024,  # 5 MB
    backupCount=5              # keep 5 old files
)

Logging to Unity Catalog Volume (BEST PRACTICE)

Create a volume first (once):

CREATE VOLUME IF NOT EXISTS my_catalog.my_schema.logs;

use it in code:

from logger_setup import get_logger

logger = get_logger(
    app_name="customer_etl",
    log_path="/dbfs/Volumes/my_catalog/my_schema/logs/customer_etl"
)

logger.info("Starting ETL pipeline")

Databricks ETL Logging Template (Production-Ready)

Features

  • Writes logs to file
  • Uses daily rotation (keeps 30 days)
  • Logs INFO, ERROR, stack traces
  • Works in notebooks + Jobs
  • Fully reusable

1. Create the logger (ready for copy & paste)

“File Version”
 import logging
from logging.handlers import TimedRotatingFileHandler

def get_logger(name="etl"):
    log_path = "/dbfs/tmp/logs/pipeline.log"   # or a UC Volume

    handler = TimedRotatingFileHandler(
        log_path,
        when="midnight",
        interval=1,
        backupCount=30
    )

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # Prevent duplicate handlers in notebook re-runs
    if not logger.handlers:
        logger.addHandler(handler)

    return logger

logger = get_logger("my_pipeline")
logger.info("Logger initialized")
Unity Catalog Volume version
# If the folder doesn't exist:
dbutils.fs.mkdirs("/Volumes/my_catalog/my_schema/logs")

/Volumes/my_catalog/my_schema/logs/

# Create logger pointing to the UC Volume
import logging
from logging.handlers import TimedRotatingFileHandler

def get_logger(name="etl"):
    log_path = "/Volumes/my_catalog/my_schema/logs/pipeline.log"

    handler = TimedRotatingFileHandler(
        filename=log_path,
        when="midnight",
        interval=1,
        backupCount=30           # keep last 30 days
    )

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # Prevent duplicate handlers when re-running notebook cells
    if not logger.handlers:
        logger.addHandler(handler)

    return logger

logger = get_logger("my_pipeline")
logger.info("Logger initialized")

2. Use the logger inside your ETL

logger.info("=== ETL START ===")

try:
    logger.info("Step 1: Read data")
    df = spark.read.csv("/mnt/raw/data.csv")

    logger.info("Step 2: Transform")
    df2 = df.filter("value > 0")

    logger.info("Step 3: Write output")
    df2.write.format("delta").mode("overwrite").save("/mnt/curated/data")

    logger.info("=== ETL COMPLETED ===")

except Exception as e:
    logger.error(f"ETL FAILED: {e}", exc_info=True)
    raise
Resulting log file (example)

The output file looks like this:

2025-11-21 15:12:01,233 - INFO - my_pipeline - === ETL START ===
2025-11-21 15:12:01,415 - INFO - my_pipeline - Step 1: Read data
2025-11-21 15:12:01,512 - INFO - my_pipeline - Step 2: Transform
2025-11-21 15:12:01,660 - INFO - my_pipeline - Step 3: Write output
2025-11-21 15:12:01,780 - INFO - my_pipeline - === ETL COMPLETED ===

If error:

2025-11-21 15:15:44,812 - ERROR - my_pipeline - ETL FAILED: File not found
Traceback (most recent call last):
  ...

Best Practices for Logging in Python

By utilizing logs, developers can easily monitor, debug, and identify patterns that can inform product decisions, ensure that the logs generated are informative, actionable, and scalable.

  1. Avoid the root logger
    it is recommended to create a logger for each module or component in an application.
  2. Centralize your logging configuration
    Python module that will contain all the logging configuration code.
  3. Use correct log levels
  4. Write meaningful log messages
  5. % vs f-strings for string formatting in logs
  6. Logging using a structured format (JSON)
  7. Include timestamps and ensure consistent formatting
  8. Keep sensitive information out of logs
  9. Rotate your log files
  10. Centralize your logs in one place

Conclusion

To achieve the best logging practices, it is important to use appropriate log levels and message formats, and implement proper error handling and exception logging. Additionally, you should consider implementing log rotation and retention policies to ensure that your logs are properly managed and archived.

Technical Interview Questions and Answers

The Job Interview is the single most critical step in the job hunting process. It is the definitive arena where all of your abilities must integrate. The interview itself is not a skill you possess; it is the moment you deploy your Integrated Skill Set, blending:

  1. Hard Skills (Technical Mastery): Demonstrating not just knowledge of advanced topics, but the depth of your expertise and how you previously applied it to solve complex, real-world problems.
  2. Soft Skills (Communication & Presence): Clearly articulating strategy, managing complexity under pressure, and exhibiting the leadership presence expected of senior-level and expert-level candidates.
  3. Contextual Skills (Business Acumen): Framing your solutions within the company’s business goals and culture, showing that you understand the strategic impact of your work.

This Integrated Skill represents your first real opportunity to sell your strategic value to the employer.

Azure Data FactoryAzure DatabricksAzure Synapse AnalyticsSQL / KQL
Power BIMicrosoft FabricAzure PurviewAzure Sentinel
Related topics

Azure Data Factory

Data Factory
Can you talk on …

what is

Databricks

Azure Databricks / Delta / Unity Catalog / Lakehouse
What is Databricks Delta (Delta Lakehouse) and how does it enhance the capabilities of Azure Databricks?

Databricks Delta, now known as Delta Lake, is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It enhances Azure Databricks by providing features like:

  • ACID transactions for data reliability and consistency.
  • Scalable metadata handling for large tables.
  • Time travel for data versioning and historical data analysis.
  • Schema enforcement and evolution.
  • Improved performance with data skipping and Z-ordering
Are there any alternative solution that is similar to Delta lakehouse?

there are several alternative technologies that provide Delta Lake–style Lakehouse capabilities (ACID + schema enforcement + time travel + scalable storage + SQL engine). such as,

  • Apache Iceberg
  • Apache Hudi
  • Snowflake (Iceberg Tables / Unistore)
  • BigQuery + BigLake
  • AWS Redshift + Lake Formation + Apache Iceberg
  • Microsoft Fabric (OneLake + Delta/DQ/DLTS)
What is Delta Lake Table?

Delta lake tables are tables that store data in the delta format. Delta Lake is an extension to existing data lakes,

Explain how you can use Databricks to implement a Medallion Architecture (Bronze, Silver, Gold).
  1. Bronze Layer (Raw Data): Ingest raw data from various sources into the Bronze layer. This data is stored as-is, without any transformation.
  2. Silver Layer (Cleaned Data, as known Enriched layer): Clean and enrich the data from the Bronze layer. Apply transformations, data cleansing, and filtering to create more refined datasets.
  3. Gold Layer (Aggregated Data, as known Curated layer): Aggregate and further transform the data from the Silver layer to create high-level business tables or machine learning features. This layer is used for analytics and reporting.
What Is Z-Order (Databricks / Delta Lake)?

Z-Ordering in Databricks (specifically for Delta Lake tables) is an optimization technique designed to co-locate related information into the same set of data files on disk.

OPTIMIZE mytable
ZORDER BY (col1, col2);
What Is Liquid Clustering (Databricks)?

Liquid Clustering is Databricks’ next-generation data layout optimization for Delta Lake.
It replaces (and is far superior to) Z-Order.

ALTER TABLE sales
SET TBLPROPERTIES (
'delta.liquidClustered' = 'true',
'delta.liquidClustered.columns' = 'customer_id, event_date'
);
What is a Dataframe, RDD, Dataset in Azure Databricks?

Dataframe refers to a specified form of tables employed to store the data within Databricks during runtime. In this data structure, the data will be arranged into two-dimensional rows and columns to achieve better accessibility.

RDD, Resilient Distributed Dataset, is a fault-tolerant, immutable collection of elements partitioned across the nodes of a cluster. RDDs are the basic building blocks that power all of Spark’s computations.

Dataset is an extension of the DataFrame API that provides compile-time type safety and object-oriented programming benefits.

What is catching and its types?

A cache is a temporary storage that holds frequently accessed data, aiming to reduce latency and enhance speed. Caching involves the process of storing data in cache memory.

How would you secure and manage secrets in Azure Databricks when connecting to external data sources?
  1. Use Azure Key Vault to store and manage secrets securely.
  2. Integrate Azure Key Vault with Azure Databricks using Databricks-backed or Azure-backed scopes.
  3. Access secrets in notebooks and jobs using the dbutils.secrets API.
    dbutils.secrets.get(scope=”<scope-name>”, key=”<key-name>”)
  4. Ensure that secret access policies are strictly controlled and audited.
Scenario: You need to implement a data governance strategy in Azure Databricks. What steps would you take?

  • Data Classification: Classify data based on sensitivity and compliance requirements.
  • Access Controls: Implement role-based access control (RBAC) using Azure Active Directory.
  • Data Lineage: Use tools like Databricks Lineage to track data transformations and movement.
  • Audit Logs: Enable and monitor audit logs to track access and changes to data.
  • Compliance Policies: Implement Azure Policies and Azure Purview for data governance and compliance monitoring.
Scenario: You need to optimize a Spark job that has a large number of shuffle operations causing performance issues. What techniques would you use?
  1. Repartitioning: Repartition the data to balance the workload across nodes and reduce skew.
  2. Broadcast Joins: Use broadcast joins for small datasets to avoid shuffle operations.
  3. Caching: Cache intermediate results to reduce the need for recomputation.
  4. Shuffle Partitions: Increase the number of shuffle partitions to distribute the workload more evenly.
  5. Skew Handling: Identify and handle skewed data by adding salt keys or custom partitioning strategies.
Scenario: You need to migrate an on-premises Hadoop workload to Azure Databricks. Describe your migration strategy.
  • Assessment: Evaluate the existing Hadoop workloads and identify components to be migrated.
  • Data Transfer: Use Azure Data Factory or Azure Databricks to transfer data from on-premises HDFS to ADLS.
  • Code Migration: Convert Hadoop jobs (e.g., MapReduce, Hive) to Spark jobs and test them in Databricks.
  • Optimization: Optimize the Spark jobs for performance and cost-efficiency.
  • Validation: Validate the migrated workloads to ensure they produce the same results as on-premises.
  • Deployment: Deploy the migrated workloads to production and monitor their performance.

Azure Synapse Analytics

Synapse
what is …

what is

SQL / KQL

SQL
Can you talk about database locker?

Database locking is the mechanism a database uses to control concurrent access to data so that transactions stay consistent, isolated, and safe.

Locking prevents:

  • Dirty reads
  • Lost updates
  • Write conflicts
  • Race conditions

Types of Locks:

1. Shared Lock (S)

  • Used when reading data
  • Multiple readers allowed
  • No writers allowed

2. Exclusive Lock (X)

  • Used when updating or inserting
  • No one else can read or write the locked item

3. Update Lock (U) (SQL Server specific)

  • Prevents deadlocks when upgrading from Shared → Exclusive
  • Only one Update lock allowed

4. Intention Locks (IS, IX, SIX)

Used at table or page level to signal a lower-level lock is coming.

5. Row / Page / Table Locks

Based on granularity:

  • Row-level: Most common, best concurrency
  • Page-level: Several rows together
  • Table-level: When scanning or modifying large portions

DB engines automatically escalate:

Row → Page → Table
when there are too many small locks.

Can you talk on Deadlock?

A deadlock happens when:

  • Transaction A holds Lock 1 and wants Lock 2
  • Transaction B holds Lock 2 and wants Lock 1

Both wait on each other → neither can move → database detects → kills one transaction (“deadlock victim”).

Deadlocks usually involve one writer + one writer, but can also involve readers depending on isolation level.

How to Troubleshoot Deadlocks?
A: In SQL Server: Enable Deadlock Graph Capture
run:
ALTER DATABASE [YourDB] SET DEADLOCK_PRIORITY NORMAL;

use:
DBCC TRACEON (1222, -1);
DBCC TRACEON (1204, -1);
B: Interpret the Deadlock Graph

You will see:

  • Processes (T1, T2…)
  • Resources (keys, pages, objects)
  • Types of locks (X, S, U, IX, etc.)
  • Which statement caused the deadlock

Look for:

  • Two queries touching the same index/rows in different order
  • A scanning query locking too many rows
  • Missed indexes
  • Query patterns that cause U → X lock upgrades
C. Identify
  • The exact tables/images involved
  • The order of locking
  • The hotspot row or range
  • Rows with heavy update/contention

This will tell you what to fix.

How to Prevent Deadlocks (Practical + Senior-Level)
  • Always update rows in the same order
  • Keep transactions short
  • Use appropriate indexes
  • Use the correct isolation level
  • Avoid long reads before writes

Can you discuss on database normalization and denormalization

Normalization is the process of structuring a relational database to minimize data redundancy (duplicate data) and improve data integrity.

Normal FormRule SummaryProblem Solved
1NF (First)Eliminate repeating groups; ensure all column values are atomic (indivisible).Multi-valued columns, non-unique rows.
2NF (Second)Be in 1NF, AND all non-key attributes must depend on the entire primary key.Partial Dependency (non-key attribute depends on part of a composite key).
3NF (Third)Be in 2NF, AND eliminate transitive dependency (non-key attribute depends on another non-key attribute).Redundancy due to indirect dependencies.
BCNF (Boyce-Codd)A stricter version of 3NF; every determinant (column that determines another column) must be a candidate key.Edge cases involving multiple candidate keys.

Denormalization is the process of intentionally introducing redundancy into a previously normalized database to improve read performance and simplify complex queries.

  • Adding Redundant Columns: Copying a value from one table to another (e.g., copying the CustomerName into the Orders table to avoid joining to the Customer table every time an order is viewed).
  • Creating Aggregate/Summary Tables: Storing pre-calculated totals, averages, or counts to avoid running expensive aggregate functions at query time (e.g., a table that stores the daily sales total).
  • Merging Tables: Combining two tables that are frequently joined into a single, wider table.

Microsoft Fabric

Power BI

Azure Purview

Azure Sentinel

Markdown in a Databricks Notebook

Databricks Notebook Markdown is a special version of the Markdown language built directly into Databricks notebooks. It allows you to add richly formatted text, images, links, and even mathematical equations to your notebooks, turning them from just code scripts into interactive documents and reports.

Think of it as a way to provide context, explanation, and structure to your code cells, making your analysis reproducible and understandable by others (and your future self!).

Why is it Important?

Using Markdown cells effectively transforms your workflow:

  1. Documentation: Explain the purpose of the analysis, the meaning of a complex transformation, or the interpretation of a result.
  2. Structure: Create sections, headings, and tables of contents to organize long notebooks.
  3. Clarity: Add lists, tables, and links to data sources or external references.
  4. Communication: Share findings with non-technical stakeholders by narrating the story of your data directly alongside the code that generated it.

Key Features and Syntax with Examples

1. Headers (for Structure)

Use # to create different levels of headings.

%md
# Title (H1)
## Section 1 (H2)
### Subsection 1.1 (H3)
#### This is a H4 Header

Title (H1)

Section 1 (H2)

Subsection 1.1 (H3)

This is a H4 Header

2. Emphasis (Bold and Italic)

%md
*This text will be italic*
_This will also be italic_

**This text will be bold**
__This will also be bold__

**_You can combine them_**

This text will be italic This will also be italic

This text will be bold This will also be bold

You can combine them

3. Lists (Ordered and Unordered)

Unordered List:
%md
- Item 1
- Item 2
  - Sub-item 2.1
  - Sub-item 2.2
  • Item 1
  • Item 2
    • Sub-item 2.1
    • Sub-item 2.2
Ordered List:
%md
1. First item
2. Second item
   1. Sub-item 2.1
3. Third item
  1. First item
  2. Second item
    1. Sub-item 2.1
  3. Third item

4. Links and Images

link
%md
[Databricks Website](https://databricks.com)

Mainri Inc. webside

Image
%md
![mainri Inc. Logo](https://mainri.ca/wp-content/uploads/2024/08/Logo-15-trans.png)
mainri Inc. Logo

5. Tables

%md
| Column 1 Header | Column 2 Header | Column 3 Header |
|-----------------|-----------------|-----------------|
| Row 1, Col 1    | Row 1, Col 2    | Row 1, Col 3    |
| Row 2, Col 1    | Row 2, Col 2    | Row 2, Col 3    |
| *Italic Cell*   | **Bold Cell**   | Normal Cell     |
Column 1 HeaderColumn 2 HeaderColumn 3 Header
Row 1, Col 1Row 1, Col 2Row 1, Col 3
Row 2, Col 1Row 2, Col 2Row 2, Col 3
Italic CellBold CellNormal Cell

6. Code Syntax Highlighting (A Powerful Feature)

%md
```python
df = spark.read.table("samples.nyctaxi.trips")
display(df)
```

```sql
SELECT * FROM samples.nyctaxi.trips LIMIT 10;
```

```scala
val df = spark.table("samples.nyctaxi.trips")
display(df)
```

7. Mathematical Equations (LaTeX)

%md


$$
f(x) = \sum_{i=0}^{n} \frac{x^i}{i!}
$$

Summary

FeaturePurposeExample Syntax
HeadersCreate structure and sections## My Section
EmphasisAdd bold/italic emphasis**bold***italic*
ListsCreate bulleted or numbered lists- Item or 1. Item
TablesOrganize data in a grid| Header |
Links/ImagesAdd references and visuals[Text](URL)
Code BlocksDisplay syntax-highlighted codepython\ncode
Math (LaTeX)Render mathematical formulas$$E = mc^2$$

In essence, Databricks Notebook Markdown is the narrative glue that binds your code, data, and insights together, making your notebooks powerful tools for both analysis and communication.

PySpark Data sources

PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive

Query Database Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
# Query table using jdbc()
# Parameters
server_name = "myserver.database.windows.net"
port = "1433"
database_name = "mydatabase"
table_name = "mytable"
username = "myusername"
password = "mypassword"

# Construct the JDBC URL
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read data from the SQL database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
# show DataFrame
df_dep.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

Query JDBC Table Parallel, specific Columns

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from emp where sex='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()
Read & Write CSV File

Read CSV File

df = spark.read.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.load("/Folder/, /filePath2/")

Write CSV File

df = spark.write.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.mode("overwrite")
.save("/filePath/filename")

By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

we need take additional step – “Rename the File to name1.csv

# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")

# Filter for the part file
for file in files:
    if file.name.startswith("part-"):
        source_file = file.path  # Full path to the part file
        destination_file = "dbfs:/FileStore/name1.csv"
        
        # Move and rename the file
        dbutils.fs.mv(source_file, destination_file)
        break


display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name

PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

Steps:

  1. Use coalesce(1) to combine all data into a single partition.
  2. Save the file to a temporary location.
  3. Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/temp_folder")

# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
    if file.name.startswith("part-"):
        part_file = file.path
        break

# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")

# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Read & Write Parquet File

sample data save at /tmp/output/people.parquet

dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet

read from parquet

sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

read a parquet is the same as reading from csv, nothing special.

write to a parquet

  • append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.

    df.write.mode(“append”).parquet(“path/to/parquet/file”)
  • overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“overwrite”).parquet(“path/to/parquet/file”)
  • ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“ignore”).parquet(“path/to/parquet/file”)
  • Create Parquet partition file

    df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

Read & write Delta Table in PySpark

Read from a Delta Table

Read from a Registered Delta Table (in the Catalog)
df = spark.read.table("default.dim_dep")

df.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+
Read from a Delta Table Stored in a Directory (Path-Based)
df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_delta_table.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Write to a Delta Table

Append to a Delta Table
# Appends to the Delta table
df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")  
Overwrite a Delta Table
# Overwrites the Delta table
df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")  
Create or Overwrite a Registered Delta Table
# Overwrites the table in the catalog
df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep") 
Append to a Registered Delta Table:
# Appends to the table in the catalog
df.write.format("delta").mode("append").saveAsTable("default.dim_dep")  

merge operation (upsert)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Perform the merge operation
target_table.alias("t").merge(
    source_df.alias("s"),
    "t.id = s.id"  # Join condition: match rows based on `id`
).whenMatchedUpdate(
    set={
        "name": "s.name",  # Update `name` column
        "date": "s.date"   # Update `date` column
    }
).whenNotMatchedInsert(
    values={
        "id": "s.id",      # Insert `id`
        "name": "s.name",  # Insert `name`
        "date": "s.date"   # Insert `date`
    }
).execute()

# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
  1. Target Table (target_table):
    • The Delta table is loaded using DeltaTable.forPath.
    • This table contains existing data where updates or inserts will be applied.
  2. Source DataFrame (source_df):
    • This DataFrame contains new or updated records.
  3. Join Condition ("t.id = s.id"):
    • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
  4. whenMatchedUpdate:
    • If a matching row is found, update the name and date columns in the target table.
  5. whenNotMatchedInsert:
    • If no matching row is found, insert the new record from the source DataFrame into the target table.
  6. execute():
    • Executes the merge operation, applying updates and inserts.
  7. Result Verification:
    • After the merge, the updated Delta table is read and displayed.

Schema Evolution

df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
Read & Write JSON file

Read a simple JSON file

# Read a simple JSON file into dataframe
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

df_simple = spark.read.format("json") \
.option("multiline","True")\
.load("dbfs:/FileStore/simple_json.json")

df_simple.printSchema()
root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)

df_simple.show()
+------------+------------+-----+-----------+-------+
|        City|RecordNumber|State|ZipCodeType|Zipcode|
+------------+------------+-----+-----------+-------+
|BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+------------+------------+-----+-----------+-------+

pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error

df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
df_simple,show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).

Reading from Multiline JSON (JSON Array) File

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
df_jsonarray_simple = spark.read\
    .format("json")\
    .option("multiline", "true")\
    .load("dbfs:/FileStore/jsonArrary.json")

df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

read complex json

df_complexjson=spark.read\
    .option("multiline","true")\
    .json("dbfs:/FileStore/jsonArrary2.json")

df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0001                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Cake                                                                                                                                      
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}                                                           
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}] 
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0002                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Raised                                                                                                                                    
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}]}                                                                                                                       
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]                                                           
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0003                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Old Fashioned                                                                                                                             
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}]}                                                                                                    
 topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]                                                                          

Reading from Multiple files at a time

# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json','resources/zipcode2.json'])

Reading from Multiple files at a time

# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")

Reading files with a user-specified custom schema

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("resources/zipcodes.json")
df_with_schema.printSchema()

Reading File using PySpark SQL

spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Write to JSON

Options

  • path: Specifies the path where the JSON files will be saved.
  • mode: Specifies the behavior when writing to an existing directory.
  • compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”).
    df2.write . option(“compression”, “gzip”)
  • dateFormat: Specifies the format for date and timestamp columns.
    df.write . option(“dateFormat”, “yyyy-MM-dd”)
  • timestampFormat: Specifies the format for timestamp columns.
    df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
  • lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline)
    df . write . option(“lineSep”, “\r\n”)
  • encoding: Specifies the character encoding to use when writing the JSON files.
    UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
    df . write . option(“encoding”, “UTF-8”)
df2.write . format("json")
 . option("compression", "gzip") \
 . option("dateFormat", "yyyy-MM-dd") \ 
 . option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
 . option("encoding", "UTF-8") \
 . option("lineSep", "\r\n")
 . save("dbfs:/FileStore/output_json")

modes

  1. Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
  2. Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
  3. Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
  4. Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Read & Write SQL Server

Read from SQL Server

server_name = "mainri-sqldb.database.windows.net"
port=1433
username="my login name"
password="my login password"
database_name="mainri-sqldb"
table_name="dep"

jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

alternative way

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .load()

Select Specific Columns to Read

In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("databaseName",database_name) \
  .option("query", "select * from [dbo].[dep] where depid>3") \
  .option("user", username) \
  .option("password", password) \
  .load()

df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    4|human resource|
|    5|         admin|
+-----+--------------+

write to table

write mode:

Append:
mode("append") to append the rows to the existing SQL Server table.

# we have define variables, here is show again
server_name = “mainri-sqldb.database.windows.net”
port=1433
username=”my login name”
password=”my login password”
database_name=”mainri-sqldb”
table_name=”dep”
jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”

# append the rows to the existing SQL Server table.

df_newrow.show()
+-----+-----+
|depid|  dep|
+-----+-----+
|    5|admin|
+-----+-----+

df_newrow.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
|    5|         admin| <- new added row
+-----+--------------+

overwrite

The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.

df_newrow.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
Read & Write MySQL

Read from MySQL

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select Specific Columns to Read

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("numPartitions",4) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Write to MySQL

Some points to note while writing

  • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
  • To retain the indexes, use option("truncate","true").
  • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
  • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
Read JDBC in Parallel

PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. 

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

# Read Table in Parallel
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable","employee") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select columns with where clause

# Select columns with where clause
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Using fetchsize with numPartitions to Read

The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.

# Using fetchsize
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Read & Write delta table, Catalog, Hive Table

Read delta table, Catalog, Hive Table

# Read Hive table
df = spark.sql("select * from emp.employee")
df.show()
# Read Hive table
df = spark.read.table("employee")
df.show()
# Read delta table
df = spark.sql("select * from delta.` table path `  ")
df.show()

caution: ` Backticks 

Write / Save

To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

use SQL, temporary view

# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")

# Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

PySpark DataFrame

PySpark DataFrame is a distributed collection of rows, similar to a table in a relational database or a DataFrame in Python’s pandas library. It provides powerful tools for querying, transforming, and analyzing large-scale structured and semi-structured data.

PySpark apply functions

Apply a function to a column

df.withColumn("Upper_Name", upper(df.Name))
df.select("Seqno","Name", upper(df.Name))

df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB")

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(upperCase,StringType())
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB")
collect ( )

collect () is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
dataCollect = deptDF.collect()
print(dataCollect)
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
Column Class

Access column

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
#root
# |-- name.fname: string (nullable = true)
# |-- gender: long (nullable = true)
+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

Column Operators

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 100|   2|   1|
| 200|   3|   4|
| 300|   4|   4|
+----+----+----+
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

df.select(df.col2 < df.col3).show()
+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

df.select(df.col2 == df.col3).show()
+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+
Convert DataFrame to Pandas

PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.

pandasDF = pysparkDF.toPandas()
Convert RDD to DataFrame
df = rdd.toDF()
Create an empty DataFrame

Create an empty DataFrame

#Create Schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
#Convert empty RDD to Dataframe
df1 = emptyRDD.toDF(schema)
#Create empty DataFrame directly.
df2 = spark.createDataFrame([], schema)
df2.printSchema()
dropDuplicates, distinct ()

key different between distinct() and dropDuplicates()

  • distinct() considers all columns when identifying duplicates, while dropDuplicates() allowing you to specify a subset of columns to determine uniqueness.
  • distinct() function treats NULL values as equal, so if there are multiple rows with NULL values in all columns, only one of them will be retained after applying distinct().
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |  #duplicated
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df.distinct().show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
Distinct count: 9
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |  # <-James is removed
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

df2 = df.dropDuplicates(["department"])
print("Distinct count: "+str(df2.count()))
Distinct count: 3
df2.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
+-------------+----------+------+
fillna() & fill()

DataFrame.fillna() and DataFrameNaFunctions.fill() to replace NULL/None values.

# Prepare Data
data = [("James", None, 3000), \
    ("Michael", "Sales", None), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", None), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", None, 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |null      |3000  |
|Michael      |Sales     |null  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |null  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |null      |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|      null|  3000|
|      Michael|     Sales|     0|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|     0|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|      null|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
#Replace 0 for null on only population column 
df.na.fill(value="unknown",subset=["department"]).show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|   unknown|  3000|
|      Michael|     Sales|  null|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  null|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar|   unknown|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+
groupBy ( )

groupBy ( ), Similar to SQL GROUP BY clause,  transformation that is used to group rows that have the same values in specified columns into summary rows

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()
+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|     Sales|   NY|     176000|     30000|
|     Sales|   CA|      81000|     23000|
|   Finance|   CA|     189000|     47000|
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
| Marketing|   CA|      80000|     18000|
+----------+-----+-----------+----------+
join ( )
  • Inner Join: Returns only the rows with matching keys in both DataFrames.
  • Left Join: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
  • Right Join: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
  • Full Outer Join: Returns all rows from both DataFrames, including matching and non-matching rows.
  • Left Semi Join: Returns all rows from the left DataFrame where there is a match in the right DataFrame.
  • Left Anti Join: Returns all rows from the left DataFrame where there is no match in the right DataFrame.
  • Self Join:
# Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

# Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
# Inner join
deptDF.join(empDF, deptDF.dept_id==empDF.emp_dept_id, "inner").show()
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|dept_name|dept_id|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
|  Finance|     10|     1|   Smith|             -1|       2018|         10|     M|  3000|
|  Finance|     10|     3|Williams|              1|       2010|         10|     M|  1000|
|  Finance|     10|     4|   Jones|              2|       2005|         10|     F|  2000|
|Marketing|     20|     2|    Rose|              1|       2010|         20|     M|  4000|
|       IT|     40|     5|   Brown|              2|       2010|         40|      |    -1|
+---------+-------+------+--------+---------------+-----------+-----------+------+------+
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
return: left-hand has, right-hand has too
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
return: left-hand has, but right-hand does not have.
# Self join
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
orderBy( ) and sort( )

orderBy() and sort() can be interchange each other

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
# Sorting different columns in different orders

df.sort("state", "age",ascending=[False,True]).show()
df.sort(df["state"].desc(), df["age"].asc()).show()
df.orderBy("state", "age",ascending=[False,True]).show()
df.orderBy(df["state"].desc(), df["age"].asc()).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+
partitionBy ( )

partitionBy ( )

pivot ( ) & Unpivot ( )

pivot() (Row to Column)

#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
# Output
root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

# Applying pivot()
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

# Output
root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

# one more example
pivotDF = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)
root
 |-- Country: string (nullable = true)
 |-- Banana: long (nullable = true)
 |-- Beans: long (nullable = true)
 |-- Carrots: long (nullable = true)
 |-- Orange: long (nullable = true)

+-------+------+-----+-------+------+
|Country|Banana|Beans|Carrots|Orange|
+-------+------+-----+-------+------+
|China  |400   |1500 |1200   |4000  |
|USA    |1000  |1600 |1500   |4000  |
|Mexico |null  |2000 |null   |null  |
|Canada |2000  |null |2000   |null  |
+-------+------+-----+-------+------+

Unpivot 

PySpark SQL doesn’t have unpivot function hence will use the stack() function. 

# Applying unpivot()
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+
sample(), sampleBy()

sample(), is a mechanism to get random sample records from the dataset

Syntax

sample(withReplacement, fraction, seed=None)

  • fraction – Fraction of rows to generate, range [0.0, 1.0]. Note that it doesn’t guarantee to provide the exact number of the fraction of records.
  • seed – Seed for sampling (default a random seed). Used to reproduce the same random sampling.
  • withReplacement – Sample with replacement or not (default False).
df=spark.range(100)
print(df.sample(0.06).collect())

#Output: [Row(id=0), Row(id=2), Row(id=17), Row(id=25), Row(id=26), Row(id=44), Row(id=80)]

Above example, my DataFrame has 100 records and I wanted to get 6% sample records which are 6 but the sample() function returned 7 records. This proves the sample function doesn’t return the exact fraction specified.

To get consistent same random sampling uses the same slice value for every run. Change slice value to get different results.

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,123).collect())
//Output: 36,37,41,43,56,66,69,75,83

print(df.sample(0.1,456).collect())
//Output: 19,21,42,48,49,50,75,80

sampleBy()

sampleBy(col, fractions, seed=None)

df2=df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1: 0.2},0).collect())

//Output: [Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]
select ( )
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+
# Select columns by different ways
df.select("firstname","lastname").show()
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

# By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Nested Struct Columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+
# Select child columns
df2.select("name.firstname","name.lastname").show(truncate=False)
+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+
# Select all child columns
df2.select("name.*").show(truncate=False)
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+
show ( )
df.show()
+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+
df.show(truncate=False)
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)
-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
only showing top 3 rows
StructType & StructField

StructType

Defines the structure of the DataFrame. StructType represents a schema, which is a collection of StructField objects. A StructType is essentially a list of fields, each with a name and data type, defining the structure of the DataFrame. It allows for the creation of nested structures and complex data types.

StructField

StructField – Defines the metadata of the DataFrame column. It represents a field in the schema, containing metadata such as the name, data type, and nullable status of the field. Each StructField object defines a single column in the DataFrame, specifying its name and the type of data it holds.

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+

nested StructType

# nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+
transform ( )

transform ( )

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# PySpark transform() Usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount)
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+
UDF

UDF (User Defined Function)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
#create a python function
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 
#Convert a Python function to PySpark UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

convertUDF = udf(lambda z: convertCase(z),StringType())
or 
convertUDF = udf(convertCase,StringType())
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+
union ( ) & unionAll ( )

union ( ) & unionAll ( ) are the same result. unionAll is older, retired


df1
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+
df2
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
df.union(df2).show()
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|<--duplicated
|Maria        |Finance   |CA   |90000 |24 |23000|<--duplicated
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+
unionByName ( )

df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2

df1
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
+-------+---+
df2
+---+-----+
| id| name|
+---+-----+
| 34|James|
| 45|Maria|
| 45|  Jen|
| 34| Jeff|
+---+-----+
# different columns order
df3 = df1.unionByName(df2)
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+
# different columns name and order 
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   5|   2|   6|
+----+----+----+

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   6|   7|   3|
+----+----+----+
df3=df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+
where() & filter()

where() & filter() can replace each other

  • Use &, |, ~ for logical operations (AND, OR, NOT).
  • Use ==, !=, >, <, >=, <= for comparisons.
  • Always wrap column references in col() for clarity.
  • For SQL-like patterns, consider using functions like like, isin, and between.
  • IS NULL –> “isNull ( )”
  • IS NOT NULL –> “isNotNull ( )”
  • LIKE –> “like ( %abc% )”
  • IN –> “isin (18, 21, 25)”
  • BETWEEN –> “between(18, 25)”
+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+
df.select("gender").filter(df.gender == "M").show()
df.select("gender").where(df.gender == "F").show()
+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     M|
+------+

+------+
|gender|
+------+
|     F|
|     F|
+------+
withColumn()
from pyspark.sql.functions import date_add, col
df.withColumn("dob", date_add("dob", 10)).\
withColumn("newsalary",col("salary")*100).\
drop("middlename").show()
+---------+--------+----------+------+------+---------+
|firstname|lastname|       dob|gender|salary|newsalary|
+---------+--------+----------+------+------+---------+
|    James|   Smith|1991-04-11|     M|   300|    30000|
|  Michael|        |2000-05-29|     M|   400|    40000|
|   Robert|Williams|1978-09-15|     M|   400|    40000|
|    Maria|   Jones|1967-12-11|     F|   400|    40000|
|      Jen|   Brown|1980-02-27|     F|    -1|     -100|
+---------+--------+----------+------+------+---------+
withColumnRenamed ( )

withColumnRenamed() rename a DataFrame column, we often need to rename one column or multiple (or all) columns on PySpark DataFrame

+--------------------+----------+------+------+
|                name|       dob|gender|salary|
+--------------------+----------+------+------+
|    {James, , Smith}|1991-04-01|     M|  3000|
|   {Michael, Rose, }|2000-05-19|     M|  4000|
|{Robert, , Williams}|1978-09-05|     M|  4000|
|{Maria, Anne, Jones}|1967-12-01|     F|  4000|
|  {Jen, Mary, Brown}|1980-02-17|     F|    -1|
+--------------------+----------+------+------+
df2 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df2.show()
+--------------------+-----------+------+-------------+
|                name|DateOfBirth|gender|salary_amount|
+--------------------+-----------+------+-------------+
|    {James, , Smith}| 1991-04-01|     M|         3000|
|   {Michael, Rose, }| 2000-05-19|     M|         4000|
|{Robert, , Williams}| 1978-09-05|     M|         4000|
|{Maria, Anne, Jones}| 1967-12-01|     F|         4000|
|  {Jen, Mary, Brown}| 1980-02-17|     F|           -1|
+--------------------+-----------+------+-------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

PySpark Built-in Functions

PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.

Visual Summary of Categories

CategoryFunctions
Basic Functionsalias, cast, lit, col, when, isnull, isnan
String Functionsconcat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap
Date and Time Functionscurrent_date, datediff, to_date, year, hour, unix_timestamp, date_format
Mathematical Functionsabs, round, floor, sqrt, pow, exp, log, sin, cos, rand
Aggregation Functionscount, sum, avg, min, max, stddev, collect_list
Array and Map Functionsarray, size, array_contains, explode, map_keys, map_values
Null Handling Functionsisnull, na.fill, na.drop, na.replace
Window Functionsrow_number, rank, ntile, lag, lead, cume_dist, percent_rank
Statistical Functionscorr, covar_samp, approx_count_distinct, percentile_approx
UDF and Advanced Functionsudf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json
sample DataFrames
Sample dataframe 
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime       |
+---+-----+-----------------------+
|1  |F    |2024-12-03 00:46:02.165|
|2  |E    |2024-12-03 00:46:02.165|
|3  |D    |2024-12-03 00:46:02.165|
|4  |G    |2024-12-03 00:46:02.165|
|6  |J    |2024-12-03 00:46:02.165|
|5  |I    |2024-12-03 00:46:02.165|
|7  |H    |2024-12-03 00:46:02.165|
|8  |K    |2024-12-03 00:46:02.165|
|9  |L    |2024-12-03 00:46:02.165|
+---+-----+-----------------------+

PySpark datetime related functions

PySpark provides a rich set of functions in the pyspark.sql.functions module to manipulate and analyze datetime columns.

date time Formatting

Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second

from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
          .show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime       |formatted_date|
+---+-----+-----------------------+--------------+
|1  |F    |2024-12-03 00:46:02.165|2024-12-03    |
|2  |E    |2024-12-03 00:46:02.165|2024-12-03    |
+---+-----+-----------------------+--------------+
Converting Between Types
  • to_date(column, format): Converts a string to a date.
  • unix_timestamp(column, format): Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp

dfc.withColumn("date_only", to_date("current_date")) \
    .withColumn("unix_time", unix_timestamp("current_date"))\          .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06    |2024-12-06|1733443200|
|2024-12-06    |2024-12-06|1733443200|
  • to_timestamp(column, format): Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.
  • from_unixtime(unix_time, format): Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit

df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
|    string|
+----------+
|2024-12-05|
+----------+

df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
|    string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+

df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
|    string|unixTimeStamp|
+----------+-------------+
|2024-12-05|   1733343200|
+----------+-------------+

df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
|   1733343200|                              2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
  • date_add ()
  • date_sub ()
  • add_month ()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()
+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )

PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.

from pyspark.sql.functions import col, current_date, datediff

df2 = df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    )
+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2024-12-06|    1985|
|2019-06-24|  2024-12-06|    1992|
|2019-08-24|  2024-12-06|    1931|
+----------+------------+--------+
months_between ( )

PySpark SQL months_between() function to get the number of months between two dates

from pyspark.sql.functions import col, current_date, datediff, months_between, round

df3 = df.withColumn("today", current_date())\
    .withColumn("monthsDiff", months_between(current_date(), col("date"))) \
    .withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id|      date|     today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
|  1|2019-07-01|2024-12-06|65.16129032|           65.16|
|  2|2019-06-24|2024-12-06|65.41935484|           65.42|
|  3|2019-08-24|2024-12-06|63.41935484|           63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years

utilize the months_between() function to get the difference in months and then convert it into years.

from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit

df4 = df.withColumn("today", current_date()) \
  .withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
  .withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))

+---+----------+----------+-----------------+---------------+
| id|      date|     today|        yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
|  1|2019-07-01|2024-12-06|5.430107526666667|           5.43|
|  2|2019-06-24|2024-12-06|5.451612903333333|           5.45|
|  3|2019-08-24|2024-12-06|5.284946236666666|           5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two

Calculates the difference between two

trunc ( )

trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01

df.select(col("input"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()
+----------+----------+-----------+
|     input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval

interval Used for advanced time calculations (not directly available but works with PySpark SQL).


Extracting Components from a Datetime
  • year(column): Extracts the year.
  • quarter(column): Returns the quarter as an integer from a given date or timestamp.
  • dayofyear(column): Extracts the day of the year from a given date or timestamp.
  • dayofmonth(column): Extracts the day of the month.
  • dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
  • weekofyear(column): Returns the week number of the year.
  • last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
  • next_day (column, day_of_week) e.g. Mon, Sunday
  • hour(column): Extracts the hour.
  • minute(column): Extracts the minute.
  • second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second

df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input                  |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1      |1    |2   |46    |2     |
|2023-01-11 15:35:32.265|2023|1      |1    |15  |35    |32    |
|2022-09-17 22:16:02.186|2022|3      |9    |22  |16    |2     |
+-----------------------+----+-------+-----+----+------+------+

from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format

df.select(date_format("input","yyy-MM-dd").alias("input"), 
    dayofweek("input").alias('dayofweek'), 
     dayofmonth("input").alias('dayofmonth'),
     weekofyear("input").alias("weekofyear") ,
     next_day("input","mon").alias("nextday"),
     last_day("input").alias('lastday')
  ).show()
+----------+---------+----------+----------+----------+----------+
|     input|dayofweek|dayofmonth|weekofyear|   nextday|   lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01|        2|         1|         1|2024-01-08|2024-01-31|
|2023-01-11|        4|        11|         2|2023-01-16|2023-01-31|
|2022-09-17|        7|        17|        37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
  • current_date (),
  • current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp

dfc.withColumn ("current_date", current_date())\
    .withColumn ("current_timestamp", current_timestamp())\
    .select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp      |
+------------+-----------------------+
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
+------------+-----------------------+

PySpark string related functions

btrim (str[, trim] ), trim ( )

btrim(str[, trim]) Trim characters at the beginning and end of the string ‘str’ are removed.

  • trim (): Removes only whitespace from the beginning and end of a string.
    when you only need to clean up whitespace from strings.
  • btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
    when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
|      text   |
+-------------+
|   hello     |
|  !!spark!!  |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
|      text|  trimmed|
+----------+---------+
|   hello  |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
|      text |trimmed_custom|
+-----------+--------------+
|   hello   |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()

concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.

  • concat ( ) : No delimiter is added between the concatenated values.
    when you need strict concatenation without any delimiters.
  • concat_ws (): with a specified delimiter between the values.
    when you need a delimiter or want to ignore NULL values.
from pyspark.sql.functions import concat, concat_ws, lit

df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|      null|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+

df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|     Alice|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )

endswith() Returns a boolean.

+--------------+
|          text|
+--------------+
|   hello world|
|PySpark is fun|
|       welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
|          text|end_with?|
+--------------+---------+
|   hello world|    false|
|PySpark is fun|     true|
|       welcome|    false|
+--------------+---------+

df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
|          text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )

contains () check whether a PySpark DataFrame column contains a specific string or not,

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
|     full_name|contain?|
+--------------+--------+
|      John Doe|   false|
|    Jane Smith|    true|
|Robert Johnson|   false|
+--------------+--------+

df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )

find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).

length ( )

length ( ) Provides the length of characters for string data or the number of bytes for binary data.

from pyspark.sql.functions import length

df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
|      text|char_length|
+----------+-----------+
|     hello|          5|
|   PySpark|          7|
|Databricks|         10|
+----------+-----------+
like ( )

like ( ) use && and || operators to have multiple conditions in Scala.

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
|          name|   R%|  %th|%John%|
+--------------+-----+-----+------+
|      John Doe|false|false|  true|
|    Jane Smith|false| true| false|
|Robert Johnson| true|false|  true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
|          name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )

substring (str, pos[, len]): Returns the substring of str that starts at pos and is of length len

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+
from pyspark.sql.functions import substring

df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
|    Ja|       Ja|
|    An|       An|
|    To|       To|
|    To|       To|
+------+---------+
split ( )

PySpark – split(), Splitting a column into multiple columns

from pyspark.sql.functions import split

df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
|     full_name|      split_names|
+--------------+-----------------+
|      John,Doe|      [John, Doe]|
|    Jane,Smith|    [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+

split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+

df_expanded = df_with_split.select(
    "full_name",
    df_with_split["split_names"].getItem(0).alias("first_name"),
    df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+
translate ( )

 translate() string function can replace character by character of DataFrame column value.

from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color|       new_color|        replaced|
+-----+----------------+----------------+
|    F|almost colorless|A_N0st c0_0r_ess|
|    E|            null|            null|
|    D|       colorless|       c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )

PySpark – regexp_replace() replace a column value with a string for another string/substring

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

from pyspark.sql.functions import regexp_replace

df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)
+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
   .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False
+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+
overlay

PySpark – overlay()

+---------------+----+
|           col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay

df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
|      overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
  • upper: Converts all characters in the column to uppercase.
  • lower: Converts all characters in the column to lowercase.
  • initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
|             text|
+-----------------+
|      hello world|
|spark sql example|
|  UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap

df.withColumn("Uppercase", upper("text"))\
   .withColumn("lowercase", lower("text"))\
    .withColumn("Capitalized", initcap("text"))\
    .show()
+-----------------+-----------------+-----------------+-----------------+
|             text|        Uppercase|        lowercase|      Capitalized|
+-----------------+-----------------+-----------------+-----------------+
|      hello world|      HELLO WORLD|      hello world|      Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
|  UPPER and LOWER|  UPPER AND LOWER|  upper and lower|  Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+

Numeric Functions

Mathematical operations on numeric columns.

abs()

abs(): Absolute value.

from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )

round(): Round to a specific number of decimals.

from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )

pow(): Power function.

from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))

Aggregate Functions

sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
  • approx_count_distinct (): returns the count of distinct items in a group
  • countdistinct: returns the count of distinct items in a group
  • distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct

df.select(approx_count_distinct("salary").alias("approx_count_distinct"), 
          countDistinct("salary").alias("countDistinct"),
          ).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
|                    6|            6|
+---------------------+-------------+

df.select("salary").distinct().show()
+------+
|salary|
+------+
|  3000|
|  4600|
|  4100|
|  3300|
|  3900|
|  2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
  • avg ( ): average of values in the input column
  • sum ( )
  • sumDistinct ( ): returns the sum of all distinct values in a column.
  • max ( )
  • min ( )
  • mean ( )
from pyspark.sql.functions import avg,sum, max, min

df.select(avg("salary").alias("avg"),
    sum("salary").alias("sum"),
    max("salary").alias("max"),
    min("salary").alias("min")
    ).show()
+------+-----+----+----+
|   avg|  sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  2|   20|
|  3|   30|
|  4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
|      20.0|
+----------+
first (), last ()
  • first() returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element.
  • last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last

df.select(first("salary").alias("first"),
         last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )

PySpark – collect_list() returns all values from an input column with duplicates.

from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )

PySpark – collect_set() returns all values from an input column without duplicates.

from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+

PySpark Window functions

PySpark’s Window Ranking functions, like row_number()rank(), and dense_rank(), assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.

  • row_number() assigns unique sequential numbers,
  • rank() provides the ranking with gaps,
  • dense_rank() offers ranking without gaps.
row_number ()

row_number() window function gives the sequential row number starting from 1 to the result of each window partition.

from pyspark.sql.window import Window
from pyspark.sql.functions import col,  row_number

windowSpec  = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
    .where(col("row_number") < 4).show()

+---+---------+-----+----------+
|_c0|      cut|color|row_number|
+---+---------+-----+----------+
|677|     Fair|    D|         1|
|772|     Fair|    D|         2|
|940|     Fair|    D|         3|
| 43|     Good|    D|         1|
| 44|     Good|    D|         2|
|239|     Good|    D|         3|
| 63|    Ideal|    D|         1|
| 64|    Ideal|    D|         2|
|121|    Ideal|    D|         3|
| 55|  Premium|    D|         1|
| 62|  Premium|    D|         2|
|151|  Premium|    D|         3|
| 29|Very Good|    D|         1|
| 35|Very Good|    D|         2|
| 39|Very Good|    D|         3|
+---+---------+-----+----------+
rank ()

rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

from pyspark.sql.functions import rank
from pyspark.sql.window import Window

windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()

+-----+---------+-----+-----+----+
|  _c0|      cut|color|price|rank|
+-----+---------+-----+-----+----+
|   29|Very Good|    D|  357|   1|
|28262|Very Good|    D|  357|   1|
|28272|     Good|    D|  361|   3|
|28273|Very Good|    D|  362|   4|
|28288|Very Good|    D|  367|   5|
|31598|    Ideal|    D|  367|   5|
|31601|  Premium|    D|  367|   5|
|31602|  Premium|    D|  367|   5|
|31618|Very Good|    D|  373|   9|
|34922|Very Good|    D|  373|   9|
|38277|  Premium|    D|  386|  11|
|38278|  Premium|    D|  386|  11|
|38279|  Premium|    D|  386|  11|
|38280|  Premium|    D|  386|  11|
|41581|Very Good|    D|  388|  15|
|41582|Very Good|    D|  388|  15|
dense_rank ()

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
|  _c0|color|price|dense_rank|
+-----+-----+-----+----------+
|   29|    D|  357|         1|
|28262|    D|  357|         1|
|28272|    D|  361|         2|
|28273|    D|  362|         3|
|28288|    D|  367|         4|
|31598|    D|  367|         4|
|31601|    D|  367|         4|
|31602|    D|  367|         4|
|31618|    D|  373|         5|
|34922|    D|  373|         5|
|38277|    D|  386|         6|
|38278|    D|  386|         6|
|38279|    D|  386|         6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0  |color|price|percent_rank         |
+-----+-----+-----+---------------------+
|29   |D    |357  |0.0                  |
|28262|D    |357  |0.0                  |
|28272|D    |361  |2.952465308532625E-4 |
|28273|D    |362  |4.428697962798937E-4 |
|28288|D    |367  |5.90493061706525E-4  |
|31598|D    |367  |5.90493061706525E-4  |
|31601|D    |367  |5.90493061706525E-4  |
|31602|D    |367  |5.90493061706525E-4  |
|31618|D    |373  |0.00118098612341305  |
|34922|D    |373  |0.00118098612341305  |
|38277|D    |386  |0.0014762326542663124|
|38278|D    |386  |0.0014762326542663124|
lag (), lead ( )
  • lag ( ) function allows you to access a previous row’s value within the partition based on a specified offset.
  • lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
  .withColumn("lead",lead("salary",2).over(windowSpec))\
  .show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
|        Maria|   Finance|  3000|null|3900|
|        Scott|   Finance|  3300|null|null|
|          Jen|   Finance|  3900|3000|null|
|        Kumar| Marketing|  2000|null|null|
|         Jeff| Marketing|  3000|null|null|
|        James|     Sales|  3000|null|4100|
|        James|     Sales|  3000|null|4100|
|       Robert|     Sales|  4100|3000|4600|
|         Saif|     Sales|  4100|3000|null|
|      Michael|     Sales|  4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )

ntile ( ) returns the relative rank of result rows within a window partition

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

PySpark json related functions

sample data
+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )

The explode() function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.

from pyspark.sql.functions import explode
explode(col)

col: The name of the column or an expression containing an array or map to be exploded.

Return a new row for each element in the array or each key-value pair in the map.

# Usage with Arrays:
# Sample data
data = [
    (1, ["a", "b", "c"]),
    (2, ["d", "e"]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
|  1|     a|
|  1|     b|
|  1|     c|
|  2|     d|
|  2|     e|
+---+------+

# Usage with Maps:
# Sample data
data = [
    (1, {"key1": "value1", "key2": "value2"}),
    (2, {"key3": "value3"}),
    (3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])

# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key|  value|
+---+----+-------+
|  1|key1| value1|
|  1|key2| value2|
|  2|key3| value3|
+---+----+-------+

work with json

Exploding a JSON Array

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "values": ["a", "b", "c"]}',),
    ('{"id": 2, "values": ["d", "e"]}',),
    ('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data                           |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]}     |
|{"id": 3, "values": []}             |
+------------------------------------+

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values   |
+---+---------+
|1  |[a, b, c]|
|2  |[d, e]   |
|3  |[]       |
+---+---------+

# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    c|
|  2|    d|
|  2|    e|
+---+-----+

Exploding a JSON Map

from pyspark.sql.types import MapType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
    ('{"id": 2, "properties": {"key3": "value3"}}',),
    ('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data                                                    |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}}                  |
|{"id": 3, "properties": {}}                                  |
+-------------------------------------------------------------+

# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties                      |
+---+--------------------------------+
|1  |{key1 -> value1, key2 -> value2}|
|2  |{key3 -> value3}                |
|3  |{}                              |
+---+--------------------------------+

# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|key1|value1|
|  1|key2|value2|
|  2|key3|value3|
+---+----+------+
  • Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the explode operation.
  • Complex JSON Structures: For deeply nested JSON structures, use nested from_json and explode calls as needed.
from_json ( )

from_json ( ): Converts JSON string into Struct type or Map type.

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object

get_json_object () Extracts JSON element from a JSON string based on json path specified.

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+
json_tuple ( )

json_tuple() Extract the Data from JSON and create them as a new columns.

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )

to_json ()  is used to convert DataFrame columns MapType or Struct type to JSON string

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )

schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.

pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) → Column

from pyspark.sql.functions import schema_of_json, col

# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'), 
        ("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]

df = spark.createDataFrame(data, columns)

# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]


struct<name:string,age:int,skills:array<string>>

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.

expr ()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

#Add Month value from another column
df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,

lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|  111| 50000|         1|
|  222| 60000|         1|
|  333| 40000|         1|
+-----+------+----------+
typedLit ()

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame

df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)

Difference between lit() and typedLit() is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()


Stack ( )

Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.

Syntax

stack(n: Int, exprs: String*): Column

Parameters

  • n: The number of rows to create per input row. Each set of n expressions in exprs corresponds to a new row.
  • exprs: A sequence of column-value pairs, typically specified as strings in the format "column_name, column_value".
+---+---+---+---+
| id|  A|  B|  C|
+---+---+---+---+
|  1|100|200|300|
|  2|400|500|600|
+---+---+---+---+

# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
    "id",
    "stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|       A|  100|
|  1|       B|  200|
|  1|       C|  300|
|  2|       A|  400|
|  2|       B|  500|
|  2|       C|  600|
+---+--------+-----+

The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns.  StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.

StructType & StructField

Simple STructType and StructField

# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark . createDataFrame(data=data, schema=schema)

Nested StructType object struct

# Defining schema using nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

When ()

When () is similar to SQL and programming languages.

from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
   when(dfc.color == "F", "almost colorless")
  . when(dfc.color == "D", "colorless")
  . when(dfc.color == "G", "indistinguishable colorless")
  . when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
  .otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color                  |
+-----+---------------------------+
|F    |almost colorless           |
|E    |very colorful              |
|D    |colorless                  |
|G    |indistinguishable colorless|
|J    |almost colorless           |
|I    |almost colorless           |
|H    |very colorful              |
|K    |very colorful              |
|L    |very colorful              |
+-----+---------------------------+

df3 = df.withColumn("new_gender", expr(
     "CASE WHEN gender = 'M' THEN 'Male' " + 
          "WHEN gender = 'F' THEN 'Female' 
           WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, 
     CASE WHEN gender = 'M' THEN 'Male' " + 
         "WHEN gender = 'F' THEN 'Female' 
          WHEN gender IS NULL THEN ''" +
         "ELSE gender END as new_gender from EMP").show()

attention: above 2 example segments plus one by one

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

deltaTable vs DataFrames

In Databricks and PySpark, DeltaTables and DataFrames both handle structured data but differ in functionality and use cases. Here’s a detailed comparison:

Definitions

DeltaTable

A DeltaTable is a storage format based on Apache Parquet, with support for ACID transactions, versioning, schema enforcement, and advanced file operations. It is managed by the Delta Lake protocol, offering features like time travel, upserts, and deletion.

DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is an abstraction for structured and semi-structured data in Spark. It is a purely in-memory abstraction and does not directly manage storage or transactions.

Features

FeatureDeltaTableDataFrame
PersistenceStores data on disk in a managed format.Primarily in-memory abstraction (ephemeral).
Schema EnforcementEnforces schema when writing/updating.No schema enforcement unless explicitly specified.
ACID TransactionsSupports atomic writes, updates, and deletes.Not transactional; changes require reprocessing.
VersioningMaintains historical versions (time travel).No versioning; a snapshot of data.
Upserts and DeletesSupports MERGE, UPDATE, and DELETE.Does not directly support these operations.
PerformanceOptimized for storage (Z-order indexing, compaction).Optimized for in-memory transformations.
Time TravelQuery historical data using snapshots.No time travel support.
IndexingSupports indexing (Z-order, data skipping).No indexing capabilities.

Use Cases

DeltaTable

Ideal for persistent storage with advanced capabilities:

  • Data lakes or lakehouses.
  • ACID-compliant operations (e.g., MERGE, DELETE).
  • Time travel to access historical data.
  • Optimizing storage with compaction or Z-ordering.
  • Schema evolution during write operations.

DataFrame

Best for in-memory processing and transformations:

  • Ad-hoc queries and ETL pipelines.
  • Working with data from various sources (files, databases, APIs).
  • Temporary transformations before persisting into Delta or other formats.

Common APIs

DeltaTable

Load Delta table from a path:

from delta.tables import DeltaTable 
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

Merge data:

delta_table.alias("target").merge( 
source_df.alias("source"), 
"target.id = source.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Time Travel:

df = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/delta/table")

Optimize

OPTIMIZE '/path/to/delta/table' ZORDER BY (column_name);

DataFrame

Read

df = spark.read.format("parquet").load("/path/to/data")

Transformations

transformed_df = df.filter(df.age > 30).groupBy("gender").count()

Write

df.write.format("delta").save("/path/to/save")

Transition Between DeltaTables and DataFrames

Convert DeltaTable to DataFrame:

df = delta_table.toDF()

Write DataFrame to Delta format:

df.write.format("delta").save("/path/to/delta/table")

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Summary of Dataframe Methods

Summary of Dataframe Methods

CategoryMethodExample
InspectionprintSchema()df.printSchema()
columnsdf.columns
Selectionselect()df.select("col1", "col2").show()
withColumn()df.withColumn("new_col", col("col1") + 1)
withColumnRenamed()df.withColumnRenamed("old", "new")
distinct()df.select(“cut”).distinct().show()
take(5) df.take(5) # Retrieve the first 5 rows.
drop( )df.drop(‘col’).show() #drop col
Filteringfilter()
where ( )
df.filter(df.col1 > 10).show()
df . where (df.col1 > 10) . show()
AggregationsgroupBy().agg()df.groupBy("col").agg(sum("val")).show()
count()df.count()
Joinsjoin()df1.join(df2, df1.id == df2.id, "inner")
Left Joindf1.join(df2, df1.id == df2.id, “left_outer”).show()
SortingorderBy()
sort( )
df.orderBy("col1").show()
df.sort(df.col1.desc()).show( )
Null Handlingdropna(), fillna()df.fillna({"col1": 0}).show()
isNotNull( )df.filter(col(‘cut’).isNotNull( )).show( )
isNull( )df.filter(col(‘cut’).isNull( )).show()
dropna()df.dropna(subset=[“col1”]).show()
Date/Timeyear(), month(), dayofmonth()df.withColumn("year", year("date_col"))
Writingwrite.format()df.write.csv(“path/to/csv”, header=True) df.write.json(“path/to/json”)
Save as Tablewrite.format().mode( ).saveAsTable( )df.write.format(“delta”).saveAsTable(“my_table”)
Create as ViewcreateOrReplaceTempView( )df.createOrReplaceTempView(“temp_view_name”)
createOrReplaceGlobalTempView( )df.createOrReplaceGlobalTempView(“global_temp_view_name”)
String Opsupper(), concat()df.withColumn("upper", upper("col"))
PartitioningpartitionBy(“col”)f.write.partitionBy(“department”).parquet(“output/parquet_data”)
repartition(4)df.repartition(4).show() # Repartition into 4 partitions.
coalesce(2)df.coalesce(2).show() # Reduce to 2 partitions.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Overview of Commonly Used Unity Catalog and Spark SQL Management Commands

Summary of frequently use Unity Catalog and Spark SQL management commands, organized in a table.

CategoryCommandDescriptionExample
Catalog ManagementSHOW CATALOGSLists all available catalogs.SHOW CATALOGS;
Schema ManagementSHOW SCHEMAS IN <catalog_name>Lists schemas (databases) within a catalog.SHOW SCHEMAS IN main;
DESCRIBE SCHEMA <catalog_name>.<schema_name>Provides metadata about a specific schema.DESCRIBE SCHEMA main.default;
Table ManagementSHOW TABLES IN <catalog_name>.<schema_name>Lists all tables in a schema.SHOW TABLES IN main.default;
DESCRIBE TABLE <catalog_name>.<schema_name>.<table_name>Displays metadata about a specific table.DESCRIBE TABLE main.default.sales_data;
SHOW PARTITIONS <catalog_name>.<schema_name>.<table_name>Lists partitions of a partitioned table.SHOW PARTITIONS main.default.sales_data;
SHOW COLUMNS IN <catalog_name>.<schema_name>.<table_name>Lists all columns of a table, including their data types.SHOW COLUMNS IN main.default.sales_data;
DROP TABLE <catalog_name>.<schema_name>.<table_name>Deletes a table from the catalog.DROP TABLE main.default.sales_data;
Database ManagementSHOW DATABASESLists all databases (schemas) in the environment.SHOW DATABASES;
DESCRIBE DATABASE <database_name>Provides metadata about a specific database.DESCRIBE DATABASE default;
Data QueryingSELECT * FROM <catalog_name>.<schema_name>.<table_name>Queries data from a table.SELECT * FROM main.default.sales_data WHERE region = 'West';
Table CreationCREATE TABLE <catalog_name>.<schema_name>.<table_name> (<columns>)Creates a managed table in Unity Catalog.CREATE TABLE main.default.sales_data (id INT, region STRING, amount DOUBLE);

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

When your Delta tables reside in Blob Storage or Azure Data Lake Storage (ADLS), you interact with them directly using their file paths. This differs from how you might access tables managed within a metastore like Unity Catalog, where you’d use a cataloged name.

Reading Delta Tables from Blob Storage or ADLS

To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

Syntax

# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
  

Writing Delta Tables to Blob Storage or ADLS

When writing to Delta tables, use the delta format and specify the path where you want to store the table.

Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks

# PySpark 
df.write.format("delta").mode("overwrite").save("path/to/delta/table")

Options and Parameters for Delta Read/Write

Options for Reading Delta Tables:

You can configure the read operation with options like:

  • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
  • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
  • timeTravel: Enables querying older versions of the Delta table.
df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
df.show()

Options for Writing Delta Tables:

mode: Controls the write mode.

  • overwrite: Overwrites the existing data.
  • append: Adds to existing data.
  • ignore: Ignores the write if data exists.
  • errorifexists: Defaults to throwing an error if data exists.

partitionBy: Partition the data by one or more columns.

overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("column_name") \
    .save("path/to/delta/table")

Time Travel and Versioning with Delta (PySpark)

Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()

# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()

Conclusion:

  • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
  • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
  • For reading, use .format("delta").load("path").
  • For writing, use .write.format("delta").save("path").

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)