The join() method is used to combine two DataFrames based on a common column or multiple columns. This method is extremely versatile, supporting various types of SQL-style joins such as inner, outer, left, and right joins.
Syntax
DataFrame.join(other, on=None, how=None)
Parameters
other: The other DataFrame to join with the current DataFrame.
on: A string or a list of column names on which to join. This can also be an expression (using col() or expr()).
how: The type of join to perform. It can be one of
'inner': Inner join (default). Returns rows that have matching values in both DataFrames.
'outer' or 'full': Full outer join. Returns all rows from both DataFrames, with null values for missing matches.
'left' or 'left_outer': Left outer join. Returns all rows from the left DataFrame and matched rows from the right DataFrame. Unmatched rows from the right DataFrame result in null values.
'right' or 'right_outer': Right outer join. Returns all rows from the right DataFrame and matched rows from the left DataFrame. Unmatched rows from the left DataFrame result in null values.
'left_semi': Left semi join. Returns only the rows from the left DataFrame where the join condition is satisfied.
'left_anti': Left anti join. Returns only the rows from the left DataFrame where no match is found in the right DataFrame.
'cross': Cross join (Cartesian product). Returns the Cartesian product of both DataFrames, meaning every row from the left DataFrame is combined with every row from the right DataFrame.
Others are the same as SQL. new, let’s focus on Semi and Anti Joins
Left Semi Join: Only returns rows from the left DataFrame that have matches in the right DataFrame.
Left Anti Join: Only returns rows from the left DataFrame that don’t have matches in the right DataFrame.
# Left semi join
df_left_semi = df1.join(df2, on="dept_id", how="left_semi")
df_left_semi.show()
==output==
+-------+-----+
|dept_id| name|
+-------+-----+
| 1|Alice|
| 2| Bob|
+-------+-----+
Charlie's dep_Id=3. it does not appear in df2, so skipped it.
# Left anti join
df_left_anti = df1.join(df2, on="dept_id", how="left_anti")
df_left_anti.show()
==output==
+-------+-------+
|dept_id| name|
+-------+-------+
| 3|Charlie|
+-------+-------+
Only Charlie, dep_Id=3, does not appear in df2, so return it only.
union ()
The union() method is used to combine two DataFrames with the same schema (i.e., same number and type of columns). This operation concatenates the rows of the two DataFrames, similar to a SQL UNION operation, but without removing duplicate rows (like UNION ALL in SQL).
Syntax
DataFrame.union(other)
Key Points
Schema Compatibility: Both DataFrames must have the same number of columns, and the data types of the corresponding columns must match.
Union Behavior: Unlike SQL’s UNION which removes duplicates, union() in PySpark keeps all rows, including duplicates. This is equivalent to SQL’s UNION ALL.
Order of Rows: The rows from the first DataFrame will appear first, followed by the rows from the second DataFrame.
Column Names and Data Types Must Match: The column names don’t need to be identical in both DataFrames, but their positions and data types must match. If the number of columns or the data types don’t align, an error will be raised.
Union with Different Column Names: Even though column names don’t need to be the same, the columns are merged by position, not by name. If you attempt to union() DataFrames with different column orders, the results could be misleading. Therefore, it’s important to make sure the schemas match.
The unionByName() method in PySpark is similar to the union() method but with a key distinction: it merges two DataFrames by aligning columns based on column names, rather than their positions.
other: The other DataFrame to be unioned with the current DataFrame.
allowMissingColumns: A boolean flag (False by default). If True, this allows the union of DataFrames even if one DataFrame has columns that are missing in the other. The missing columns in the other DataFrame will be filled with null values.
Key Points
Column Name Alignment: The method aligns columns by name, not by their position, which makes it flexible for combining DataFrames that have different column orders.
Handling Missing Columns: By default, if one DataFrame has columns that are missing in the other, PySpark will throw an error. However, setting allowMissingColumns=True allows unioning in such cases, and missing columns in one DataFrame will be filled with null values in the other.
Duplicate Rows: Just like union(), unionByName() does not remove duplicates, and the result includes all rows from both DataFrames.
unionAll() was an older method used to combine two DataFrames without removing duplicates. However, starting from PySpark 2.0, unionAll() has been deprecated and replaced by union(). The behavior of unionAll() is now identical to that of union() in PySpark.
look at union () in detail.
fillna (), df.na.fill()
fillna() is a method used to replace null (or missing) values in a DataFrame with a specified value. Return new DataFrame with null values replaced by the specified value.
Syntax
DataFrame.fillna(value, subset=None)
df.na.fill(value, subset=None)
df.na.fill(value, subset=None) has the result of df.fillna().
Parameters
value: The value to replace null with. It can be a scalar value (applied across all columns) or a dictionary (to specify column-wise replacement values). The type of value should match the type of the column you are applying it to (e.g., integers for integer columns, strings for string columns).
subset: Specifies the list of column names where the null values will be replaced. If not provided, the replacement is applied to all columns.
sample df
+-------+----+----+
| name| age|dept|
+-------+----+----+
| Alice| 25|null|
| Bob|null| HR|
|Charlie| 30|null|
+-------+----+----+
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- dept: string (nullable = true)
"age" is long, "dept" is string
# Fill null values with default values for all columns
df_filled = df.fillna(0)
==output==
+-------+---+----+
| name|age|dept|
+-------+---+----+
| Alice| 25|null|
| Bob| 0| HR|
|Charlie| 30|null|
+-------+---+----+
Bob's age is filled with 0, since it is "long", and "Dept" column did not fill, still remains "null".
Handle different Columns with different data type
# Fill nulls in the 'age' column with 0 and in the 'dept' column with "Unknown"
df_filled_columns = df.fillna({"age": 0, "dept": "Unknown"})
df_filled_columns.show()
==output==
+-------+---+-------+
| name|age| dept|
+-------+---+-------+
| Alice| 25|Unknown|
| Bob| 0| HR|
|Charlie| 30|Unknown|
Fill Nulls in Specific Subset of Columns
# Fill null values only in the 'age' column
df_filled_age = df.fillna(0, subset=["age"])
df_filled_age.show()
+-------+---+----+
| name|age|dept|
+-------+---+----+
| Alice| 25|null|
| Bob| 0| HR|
|Charlie| 30|null|
+-------+---+----+
select()
select() is used to project a subset of columns from a DataFrame or to create new columns based on expressions. It returns a new DataFrame containing only the selected columns or expressions.
In PySpark, there isn’t an explicit “if-else" statement construct like in regular Python. Instead, PySpark provides several ways to implement conditional logic using functions such as when (), otherwise (), withColumn(),expr (), UDF etc.
using when (), expr() look at following sections. now focus on UDF to implement “if — then — else –” logic”.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a Python function for age classification
def classify_age(age):
if age >= 18:
return 'Adult'
else:
return 'Minor'
# Register the function as a UDF
classify_age_udf = udf(classify_age, StringType())
# Create the DataFrame
data = [(1, 25), (2, 12), (3, 40)]
df = spark.createDataFrame(data, ["id", "age"])
+---+---+
| id|age|
+---+---+
| 1| 25|
| 2| 12|
| 3| 40|
+---+---+
# Apply the UDF to create a new column with the if-else logic
df = df.withColumn("age_group", classify_age_udf(df["age"]))
df.show()
+---+---+---------+
| id|age|age_group|
+---+---+---------+
| 1| 25| Adult|
| 2| 12| Minor|
| 3| 40| Adult|
+---+---+---------+
In this example:
The function classify_age behaves like a Python if-else statement.
The UDF (classify_age_udf) applies this logic to the DataFrame.
when (), otherwise ()
when function in PySpark is used for conditional expressions, similar to SQL’s CASE WHEN clause.
Syntax
from pyspark.sql.functions import when when(condition, value).otherwise(default_value)
Parameters
condition: A condition that returns a boolean (True/False). If this condition is true, the function will return the specified value.
value: The value to return when the condition is true.
otherwise(default_value): An optional method that specifies the default value to return when the condition is false.
The expr () function allows you to use SQL expressions as a part of the DataFrame transformation logic.
Syntax
expr () from pyspark.sql.functions import expr expr(sql_expression)
Parameters:
sql_expression: A string containing a SQL-like expression to be applied to the DataFrame. It can contain any valid SQL operations, such as arithmetic operations, column references, conditional logic, or function calls.
The selectExpr() method allows you to directly select columns or apply SQL expressions on multiple columns simultaneously, similar to SQL’s SELECT statement.
Syntax
from pyspark.sql.functions import selectExpr
df.selectExpr(*sql_expressions)
Parameters:
sql_expression: A list of SQL-like expressions (as strings) that define how to transform or select columns. Each expression can involve selecting a column, renaming it, applying arithmetic, or adding conditions using SQL syntax.
from pyspark.sql.functions import selectExpr
# Select specific columns and rename them using selectExpr()
df = df.selectExpr("id", "value * 2 as double_value", "value as original_value")
df.show()
==output==
+---+------------+--------------+
| id|double_value|original_value|
+---+------------+--------------+
| 1| 20| 10|
| 2| 40| 20|
| 3| 60| 30|
+---+------------+--------------+
# Use CASE WHEN to categorize values
df = df.selectExpr("id", "value", "CASE WHEN value >= 20 THEN 'High' ELSE 'Low' END as category")
df.show()
+---+-----+--------+
| id|value|category|
+---+-----+--------+
| 1| 10| Low|
| 2| 20| High|
| 3| 30| High|
+---+-----+--------+
# Apply multiple transformations and expressions
df = df.selectExpr("id", "value", "value * 2 as double_value", "CASE WHEN value >= 20 THEN 'High' ELSE 'Low' END as category")
df.show()
+---+-----+------------+--------+
| id|value|double_value|category|
+---+-----+------------+--------+
| 1| 10| 20| Low|
| 2| 20| 40| High|
| 3| 30| 60| High|
+---+-----+------------+--------+
comparison: expr () and selectExpr ()
Key Differences expr () is used when you want to apply SQL-like expressions in the middle of a transformation, typically within withColumn() or filter(). selectExpr() is used when you want to apply multiple expressions in a single statement to transform and select columns, much like a SQL SELECT statement.
Feature
expr()
selectExpr()
Purpose
Used for applying single SQL expressions
Used for applying multiple SQL expressions
Typical Usage
Inside select(), withColumn(), filter()
As a standalone method for multiple expressions
Operates On
Individual column expressions
Multiple expressions at once
Flexibility
Allows complex operations on a single column
Simpler for multiple transformations
Example
df.select(expr("age + 5").alias("new_age"))
df.selectExpr("age + 5 as new_age", "age * 2")
Use Case
Fine-grained control of expressions in various transformations
Quickly apply and select multiple expressions as new columns
Conclusion expr (): Ideal for transforming or adding individual columns using SQL expressions. selectExpr (): Useful for selecting, renaming, and transforming multiple columns at once using SQL-like syntax.
na.fill ()
na.fill(): Used to replace null values in DataFrames.
#Fill null values in a specific column with a default value (e.g., 0)
df = df.na.fill({"value": 0})
df.show()
==output==
+---+-----+
| id|value|
+---+-----+
| 1| 10|
| 2| 20|
| 3| 30|
| 4| 0|
+---+-----+
coalescs ()
coalesce function is used to return the first non-null value among the columns you provide
Syntax
from pyspark.sql.functions import coalesce coalesce(col1, col2, …, colN)
from pyspark.sql.functions import coalesce
# Return the first non-null value from multiple columns
df = df.withColumn("first_non_null", coalesce(col("column1"), col("column2"), col("column3")))
isin ()
isin function is used to check if a value belongs to a list or set of values.
Syntax from pyspark.sql.functions import col col(“column_name”).isin(value1, value2, …, valueN)
from pyspark.sql.functions import col
# Check if the value is between 20 and 30
df = df.withColumn("value_between_20_and_30", col("value").between(20, 30)).show()
df.show()
==output==
+---+-----+-----------------------+
| id|value|value_between_20_and_30|
+---+-----+-----------------------+
| 1| 15| false|
| 2| 20| true|
| 3| 30| true|
| 4| 36| false|
note: it included boundary value - "20" and "30"
isNull (), isNotNull ()
PySpark provides isNull and isNotNull functions to check for null values in DataFrame columns.
It’s a “transformation”. withColumn() add or replace a column_name in a DataFrame. In other words, if “column_name” exists, replace/change the existing column, otherwise, add “column_name” as new column.
"column_name": The name of the new or existing column.
expression: Any transformation, calculation, or function applied to create or modify the column.
Basic Column Creation (with literal values)
from pyspark.sql.functions import lit # Add a column with a constant value df_new = df.withColumn(“New_Column”, lit(100)) ===output=== ID Name Grade New_Column 1 Alice null 100 2 Bob B 100 3 Charlie C 100
# Add a new column, no value df_new = df.withColumn(“New_Column”, lit(None)) ===output=== ID Name Grade New_Column 1 Alice null null 2 Bob B null 3 Charlie C null
Arithmetic Operation
from pyspark.sql.functions import col # Create a new column based on arithmetic operations df_arithmetic = df.withColumn(“New_ID”, col(“ID”) * 2 + 5) df_arithmetic.show() ===output=== ID Name Grade New_ID 1 Alice null 7 2 Bob B 9 3 Charlie C 11
Using SQL Function
you can use functions like concat(), substring(), when(), length(), etc.
from pyspark.sql.functions import concat, lit # Concatenate two columns with a separator df_concat = df.withColumn(“Full_Description”, concat(col(“Name”), lit(” has ID “), col(“ID”)))
Conditional Logic
Using when() and otherwise() is equivalent to SQL’s CASE WHEN expression.
from pyspark.sql.functions import when
# Add a new column with conditional logic
df_conditional = df.withColumn("Is_Adult", when(col("ID") > 18, "Yes").otherwise("No"))
df_conditional.show()
String Function
You can apply string functions like upper(), lower(), or substring()
from pyspark.sql.functions import upper # Convert a column to uppercase df_uppercase = df.withColumn(“Uppercase_Name”, upper(col(“Name”))) df_uppercase.show()
Type Casting
You can cast a column to a different data type.
Cast the ID column to a string
# Cast the ID column to a string df_cast = df.withColumn(“ID_as_string”, col(“ID”).cast(“string”)) df_cast.show()
Handling Null Values
create columns that handle null values using coalesce() or fill()
Coalesce:
This function returns the first non-null value among its arguments.
from pyspark.sql.functions import coalesce # Return non-null value between two columns df_coalesce = df.withColumn(“NonNullValue”, coalesce(col(“Name”), lit(“Unknown”))) df_coalesce.show()
Fill Missing Values:
# Replace nulls in a column with a default value df_fill = df.na.fill({“Name”: “Unknown”}) df_fill.show()
Creating Columns with Complex Expressions
create columns based on more complex expressions or multiple transformations at once.
# Create a column with multiple transformations df_complex = df.withColumn( “Complex_Column”, concat(upper(col(“Name”)), lit(“_”), col(“ID”).cast(“string”)) ) df_complex.show(truncate=False)
example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, when, upper, coalesce
# Initialize Spark session
spark = SparkSession.builder.appName("withColumnExample").getOrCreate()
# Create a sample DataFrame
data = [(1, "Alice", None), (2, "Bob", "B"), (3, "Charlie", "C")]
df = spark.createDataFrame(data, ["ID", "Name", "Grade"])
# Perform various transformations using withColumn()
df_transformed = df.withColumn("ID_Multiplied", col("ID") * 10) \
.withColumn("Full_Description", concat(upper(col("Name")), lit(" - ID: "), col("ID"))) \
.withColumn("Pass_Status", when(col("Grade") == "C", "Pass").otherwise("Fail")) \
.withColumn("Non_Null_Grade", coalesce(col("Grade"), lit("N/A"))) \
.withColumn("ID_as_String", col("ID").cast("string"))
# Show the result
df_transformed.show(truncate=False)
==output==
ID Name Grade ID_Multiplied Full_Description Pass_Status Non_Null_Grade ID_as_String
1 Alice null 10 ALICE - ID: 1 Fail N/A 1
2 Bob B 20 BOB - ID: 2 Fail B 2
3 Charlie C 30 CHARLIE - ID: 3 Pass C 3
select ()
select () is used to project (select) a set of columns or expressions from a DataFrame. This function allows you to choose and work with specific columns, create new columns, or apply transformations to the data.
Syntax
DataFrame.select(*cols)
Commonly Used PySpark Functions with select()
col(column_name): Refers to a column.
alias(new_name): Assigns a new name to a column.
lit(value): Adds a literal value.
round(column, decimals): Rounds off the values in a column.
Renaming Columns:
df.select(df["column1"].alias("new_column1"), df["column2"]).show()
Using Expressions:
from pyspark.sql import functions as F
df.select(F.col("column1"), F.lit("constant_value"), (F.col("column2") + 10).alias("modified_column2")).show()
Performing Calculations
df.select((df["column1"] * 2).alias("double_column1"), F.round(df["column2"], 2).alias("rounded_column2")).show()
Handling Complex Data Types (Struct, Array, Map):
df.select("struct_column.field_name").show()
Selecting with Wildcards:
While PySpark doesn’t support SQL-like wildcards directly, you can achieve similar functionality using selectExpr (discussed below) or other methods like looping over df.columns.
df.select([c for c in df.columns if “some_pattern” in c]).show()
Using selectExpr ()
df.selectExpr(“column1”, “column2 + 10 as new_column2”).show()
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
df = spark.read \
.format("parquet") \
.option("mergeSchema", "true") \ # Merges schemas of all files (useful when reading from multiple files with different schemas)
.option("pathGlobFilter", "*.parquet") \ # Read only specific files based on file name patterns
.option("recursiveFileLookup", "true") \ # Recursively read files from directories and subdirectories
.load("/path/to/parquet/file/or/directory")
Options
mergeSchema: When reading Parquet files with different schemas, merge them into a single schema.
true (default: false)
pathGlobFilter: Allows specifying a file pattern to filter which files to read (e.g., “*.parquet”).
recursiveFileLookup: Reads files recursively from subdirectories.
true (default: false)
modifiedBefore/modifiedAfter: Filter files by modification time. For example: .option(“modifiedBefore”, “2023-10-01T12:00:00”) .option(“modifiedAfter”, “2023-01-01T12:00:00”)
maxFilesPerTrigger: Limits the number of files processed in a single trigger, useful for streaming jobs.
schema: Provides the schema of the Parquet file (useful when reading files without inferring schema).
Programmatically pass a list of file paths. These options help streamline your data ingestion process when dealing with multiple Parquet files in Databricks.
Write to parquet
Syntax
# Writing a Parquet file
df.write \
.format("parquet") \
.mode("overwrite") \ # Options: "overwrite", "append", "ignore", "error"
.option("compression", "snappy") \ # Compression options: none, snappy, gzip, lzo, brotli, etc.
.option("maxRecordsPerFile", 100000) \ # Max number of records per file
.option("path", "/path/to/output/directory") \
.partitionBy("year", "month") \ # Partition the output by specific columns
.save()
Options
compression: .option(“compression”, “snappy”)
Specifies the compression codec to use when writing files. Options: none, snappy (default), gzip, lzo, brotli, lz4, zstd, etc.