from_json() is a function used to parse a JSON string into a structured DataFrame format (such as StructType, ArrayType, etc.). It is commonly used to deserialize JSON strings stored in a DataFrame column into complex types that PySpark can work with more easily.
Syntax
from_json(column, schema, options={})
Parameters
column: The column containing the JSON string. Can be a string that refers to the column name or a column object.
schema
Specifies the schema of the expected JSON structure. Can be a StructType (or other types like ArrayType depending on the JSON structure).
options
allowUnquotedFieldNames: Allows field names without quotes. (default: false)
allowNumericLeadingZeros: Allows leading zeros in numbers. (default: false)
allowBackslashEscapingAnyCharacter: Allows escaping any character with a backslash. (default: false)
mode: Controls how to handle malformed records PERMISSIVE: The default mode that sets null values for corrupted fields. DROPMALFORMED: Discards rows with malformed JSON strings. FAILFAST: Fails the query if any malformed records are found.
Sample DF
+----------------------------+
|json_string |
+----------------------------+
|{"name": "John", "age": 30} |
|{"name": "Alice", "age": 25}|
+----------------------------+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, col
# Define the schema for the nested JSON
schema = StructType([
StructField("name", StructType([
StructField("first", StringType(), True),
StructField("last", StringType(), True)
]), True),
StructField("age", IntegerType(), True)
])
# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))
# Display the parsed JSON
df_parsed.select("parsed_json.*").show(truncate=False)
+--------+---+
| name |age|
+--------+---+
|{John, Doe}|30|
|{Alice, Smith}|25|
+--------+---+
to_json()
to_json() is a function that converts a structured column (such as one of type StructType, ArrayType, etc.) into a JSON string.
Syntax
to_json(column, options={})
Parameters
column: The column you want to convert into a JSON string. The column should be of a complex data type, such as StructType, ArrayType, or MapType. Can be a column name (as a string) or a Column object.
options
pretty: If set to true, it pretty-prints the JSON output.
dateFormat: Specifies the format for DateType and TimestampType columns (default: yyyy-MM-dd).
timestampFormat: Specifies the format for TimestampType columns (default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX).
ignoreNullFields: When set to true, null fields are omitted from the resulting JSON string (default: true).
compression: Controls the compression codec used to compress the JSON output, e.g., gzip, bzip2.
StructType() in PySpark is part of the pyspark.sql.types module and it is used to define the structure of a DataFrame schema. StructField () is a fundamental part of PySpark’s StructType, used to define individual fields (columns) within a schema. A StructField specifies the name, data type, and other attributes of a column in a DataFrame schema.
fields (optional): A list of StructField objects that define the schema. Each StructField object specifies the name, type, and whether the field can be null.
Key Components
name: The name of the column.
dataType: The data type of the column (e.g., StringType(), IntegerType(), DoubleType(), etc.).
nullable: Boolean flag indicating whether the field can contain null values (True for nullable).
Common Data Types Used in StructField
StringType(): Used for string data.
IntegerType(): For integers.
DoubleType(): For floating-point numbers.
LongType(): For long integers.
ArrayType(): For arrays (lists) of values.
MapType(): For key-value pairs (dictionaries).
TimestampType(): For timestamp fields.
BooleanType(): For boolean values (True/False).
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Initialize Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()
# Define schema using StructTypeschema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# Create DataFrame using the schema
data = [("John", 30), ("Alice", 25)]
df = spark.createDataFrame(data, schema)
df.show()
+-----+---+
| name|age|
+-----+---+
| John| 30|
|Alice| 25|
+-----+---+
Nested Schema
StructType can define a nested schema. For example, a column in the DataFrame might itself contain multiple fields.
The contains() function in PySpark is used to check if a string column contains a specific substring. It’s typically used in a filter() or select() operation to search within the contents of a column and return rows where the condition is true.
Syntax
Column.contains(substring: str)
Key Points
contains() is case-sensitive by default. For case-insensitive matching, use it with lower() or upper().
It works on string columns only. For non-string columns, you would need to cast the column to a string first.
The collect () function simply gathers all rows from the DataFrame or RDD and returns them as a list of Row objects. It does not accept any parameters.
Syntax
DataFrame.collect() not any parameter at all
Key Points
Use on Small Data: Since collect() brings all the data to the driver, it should be used only on small datasets. If you try to collect a very large dataset, it can cause memory issues or crash the driver..
For large datasets, consider using alternatives like take(n), toPandas(), show(n)
sample DF
+-------+---+
| Name|Age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
# Collect all rows from the DataFrame
collected_data = df.collect()
# Access all rows
print(collected_data)
[Row(Name='Alice', Age=25), Row(Name='Bob', Age=30), Row(Name='Charlie', Age=35)]
# Access a row
print(collected_data[0])
Row(Name='Alice', Age=25)
# Access a cell
print(collected_data[0][0])
Alice
# Display the collected data
for row in collected_data:
print(row)
==output==
Row(Name='Alice', Age=25)
Row(Name='Bob', Age=30)
Row(Name='Charlie', Age=35)
# Filter and collect
filtered_data = df.filter(df["Age"] > 30).collect()
# Process collected data
for row in filtered_data:
print(f"{row['Name']} is older than 30.")
==output==
Charlie is older than 30.
# Access specific columns from collected data
for row in collected_data:
print(f"Name: {row['Name']}, Age: {row.Age}")
==output==
Name: Alice, Age: 25
Name: Bob, Age: 30
Name: Charlie, Age: 35
transform ()
The transform () function in PySpark is a higher-order function introduced to apply custom transformations to columns in a DataFrame. It allows you to perform a transformation function on an existing column, returning the transformed data as a new column. It is particularly useful when working with complex data types like arrays or for applying custom logic to column values.
Syntax
df1 = df.transform(my_function)
Parameters
my_function: a python function
Key point
The transform() method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function name “my_function"
sample DF
+-------+---+
| Name|Age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
from pyspark.sql.functions import upper
#Declare my function
def addTheAge(mydf):
return mydf.withColumn('age',mydf.age + 2)
def upcaseTheName(mydf):
reture mydf.withColumn("Name", upper(mydf.Name))
#transforming: age add 2 years
df1=df.transform(addTheAge)
df1.show()
+-------+---+
| Name|Age|
+-------+---+
| Alice| 27|
| Bob| 32|
|Charlie| 37|
+-------+---+
The transform () method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function name "my_function".
#transforming to upcase
df1=df.transform(upcaseTheName)
df1.show()
+-------+---+
| Name|Age|
+-------+---+
| ALICE| 25|
| BOB| 30|
|CHARLIE| 35|
+-------+---+
#transforming to upcase and age add 2 years
df1=df.transform(addTheAge).transform(upcaseTheName)
df1.show()
+-------+---+
| Name|Age|
+-------+---+
| ALICE| 27|
| BOB| 32|
|CHARLIE| 37|
+-------+---+
udf()
The udf (User Defined Function) allows you to create custom transformations for DataFrame columns using Python functions. You can use UDFs when PySpark’s built-in functions don’t cover your use case or when you need more complex logic.
Syntax
from pyspark.sql.functions import udf from pyspark.sql.types import DataType # Define a UDF function , # Register the function as a UDF my_udf = udf(py_function, returnType)
Parameters
py_function: A Python function you define to transform the data.
returnType: PySpark data type (e.g., StringType(), IntegerType(), DoubleType(), etc.) that indicates what type of data your UDF returns.
StringType(): For returning strings.
IntegerType(): For integers.
FloatType(): For floating-point numbers.
BooleanType(): For booleans.
ArrayType(DataType): For arrays.
StructType(): For structured data.
Key point
You need to specify the return type for the UDF, as PySpark needs to understand how to handle the results of the UDF.
sample DF
+-------+---+
| Name|Age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a Python function
def to_upper(name):
return name.upper()
# Register the function as a UDF, specifying the return type as StringType
uppercase_udf = udf(to_upper, StringType())
# Apply the UDF using withColumn
df_upper = df.withColumn("Upper_Name", uppercase_udf(df["Name"]))
df_upper.show()
+-------+---+----------+
| Name|Age|Upper_Name|
+-------+---+----------+
| Alice| 25| ALICE|
| Bob| 30| BOB|
|Charlie| 35| CHARLIE|
+-------+---+----------+
# UDF Returning Boolean
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# Define a Python function
def is_adult(age):
return age > 30
# Register the UDF
is_adult_udf = udf(is_adult, BooleanType())
# Apply the UDF
df_adult = df.withColumn("Is_Adult", is_adult_udf(df["Age"]))
# Show the result
df_adult.show()
+-------+---+--------+
| Name|Age|Is_Adult|
+-------+---+--------+
| Alice| 25| false|
| Bob| 30| false|
|Charlie| 35| true|
+-------+---+--------+
# UDF with Multiple Columns (Multiple Arguments)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a Python function that concatenates two columns
def concat_name_age(name, age):
return f"{name} is {age} years old"
# Register the UDF
concat_udf = udf(concat_name_age, StringType())
# Apply the UDF to multiple columns
df_concat = df.withColumn("Description", concat_udf(df["Name"], df["Age"]))
# Show the result
df_concat.show()
+-------+---+--------------------+
| Name|Age| Description|
+-------+---+--------------------+
| Alice| 25|Alice is 25 years...|
| Bob| 30| Bob is 30 years old|
|Charlie| 35|Charlie is 35 yea...|
+-------+---+--------------------+
spark.udf.register(),Register for SQL query
in PySpark, you can use spark.udf.register() to register a User Defined Function (UDF) so that it can be used not only in DataFrame operations but also in SQL queries. This allows you to apply your custom Python functions directly within SQL statements executed against your Spark session.
registered_pyFun_for_sql: The name of the UDF, which will be used in SQL queries.
original_pyFun: The original Python function that contains the custom logic.
returnType: The return type of the UDF, which must be specified (e.g., StringType(), IntegerType()).
smaple df
+-------+---+
| Name|Age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
# Define a Python function
def original_myPythonFun(name):
return name.upper()
# Register the function as a UDF for SQL with the return type StringType
spark.udf.register("registered_pythonFun_for_sql"\
, original_myPythonFun\
, StringType()\
)
# Use the UDF - "registered_pythonFun_for_sql" in a SQL query
result = spark.sql(\
"SELECT Name, registered_pythonFun_for_sql(Name) AS Upper_Name \
FROM people"\
)
result.show()
+-------+----------+
| Name|Upper_Name|
+-------+----------+
| Alice| ALICE|
| Bob| BOB|
|Charlie| CHARLIE|
+-------+----------+
we can directly use SQL style with magic %sql
%sql
SELECT Name
, registered_pythonFun_for_sql(Name) AS Upper_Name
FROM people
+-------+----------+
| Name|Upper_Name|
+-------+----------+
| Alice| ALICE|
| Bob| BOB|
|Charlie| CHARLIE|
+-------+----------+
A complex python function declares, udf register, and apply example
Define the Python function to handle multiple columns and scalars
# Define the Python function to handle multiple columns and scalars
def pyFun(col1, col2, col3, col9, int1, int2, int3, str1, str2, str3):
# Example business logic
re_value1 = col1 * int1 + len(str1)
re_value2 = col2 + col9 + len(str2)
re_value3 = col3 * int3 + len(str3)
value4 = re_value1 + re_value2 + re_value3
# Return multiple values as a tuple
return re_value1, re_value2, re_value3, value4
Define the return schema for the UDF
# Define the return schema for the UDF
return_schema = StructType([
StructField("re_value1", IntegerType(), True),
StructField("re_value2", IntegerType(), True),
StructField("re_value3", IntegerType(), True),
StructField("value4", IntegerType(), True)
])
Register the UDF
# register the UDF
# for SQL use this
# spark.udf.register("pyFun_udf", pyFun, returnType=return_schema)
pyFun_udf = udf(pyFun, returnType=return_schema)
Apply the UDF to the DataFrame, using ‘lit()’ for constant values
# Apply the UDF to the DataFrame, using 'lit()' for constant values
df_with_udf = df.select(
col("col1"),
col("col2"),
col("col3"),
col("col9"),
col("str1"),
col("str2"),
col("str3"),
pyFun_udf(
col("col1"),
col("col2"),
col("col3"),
col("col9"),
lit(10), # Use lit() for the constant integer values
lit(20),
lit(30),
col("str1"),
col("str2"),
col("str3")
).alias("result")
)
# Show the result
df_with_udf.show(truncate=False)
==output==
+----+----+----+----+-----+-------+-----+------------------+
|col1|col2|col3|col9|str1 |str2 |str3 |result |
+----+----+----+----+-----+-------+-----+------------------+
|1 |2 |3 |9 |alpha|beta |gamma|{15, 15, 95, 125} |
|4 |5 |6 |8 |delta|epsilon|zeta |{45, 20, 184, 249}|
+----+----+----+----+-----+-------+-----+------------------+
The join() method is used to combine two DataFrames based on a common column or multiple columns. This method is extremely versatile, supporting various types of SQL-style joins such as inner, outer, left, and right joins.
Syntax
DataFrame.join(other, on=None, how=None)
Parameters
other: The other DataFrame to join with the current DataFrame.
on: A string or a list of column names on which to join. This can also be an expression (using col() or expr()).
how: The type of join to perform. It can be one of
'inner': Inner join (default). Returns rows that have matching values in both DataFrames.
'outer' or 'full': Full outer join. Returns all rows from both DataFrames, with null values for missing matches.
'left' or 'left_outer': Left outer join. Returns all rows from the left DataFrame and matched rows from the right DataFrame. Unmatched rows from the right DataFrame result in null values.
'right' or 'right_outer': Right outer join. Returns all rows from the right DataFrame and matched rows from the left DataFrame. Unmatched rows from the left DataFrame result in null values.
'left_semi': Left semi join. Returns only the rows from the left DataFrame where the join condition is satisfied.
'left_anti': Left anti join. Returns only the rows from the left DataFrame where no match is found in the right DataFrame.
'cross': Cross join (Cartesian product). Returns the Cartesian product of both DataFrames, meaning every row from the left DataFrame is combined with every row from the right DataFrame.
Others are the same as SQL. new, let’s focus on Semi and Anti Joins
Left Semi Join: Only returns rows from the left DataFrame that have matches in the right DataFrame.
Left Anti Join: Only returns rows from the left DataFrame that don’t have matches in the right DataFrame.
# Left semi join
df_left_semi = df1.join(df2, on="dept_id", how="left_semi")
df_left_semi.show()
==output==
+-------+-----+
|dept_id| name|
+-------+-----+
| 1|Alice|
| 2| Bob|
+-------+-----+
Charlie's dep_Id=3. it does not appear in df2, so skipped it.
# Left anti join
df_left_anti = df1.join(df2, on="dept_id", how="left_anti")
df_left_anti.show()
==output==
+-------+-------+
|dept_id| name|
+-------+-------+
| 3|Charlie|
+-------+-------+
Only Charlie, dep_Id=3, does not appear in df2, so return it only.
union ()
The union() method is used to combine two DataFrames with the same schema (i.e., same number and type of columns). This operation concatenates the rows of the two DataFrames, similar to a SQL UNION operation, but without removing duplicate rows (like UNION ALL in SQL).
Syntax
DataFrame.union(other)
Key Points
Schema Compatibility: Both DataFrames must have the same number of columns, and the data types of the corresponding columns must match.
Union Behavior: Unlike SQL’s UNION which removes duplicates, union() in PySpark keeps all rows, including duplicates. This is equivalent to SQL’s UNION ALL.
Order of Rows: The rows from the first DataFrame will appear first, followed by the rows from the second DataFrame.
Column Names and Data Types Must Match: The column names don’t need to be identical in both DataFrames, but their positions and data types must match. If the number of columns or the data types don’t align, an error will be raised.
Union with Different Column Names: Even though column names don’t need to be the same, the columns are merged by position, not by name. If you attempt to union() DataFrames with different column orders, the results could be misleading. Therefore, it’s important to make sure the schemas match.
The unionByName() method in PySpark is similar to the union() method but with a key distinction: it merges two DataFrames by aligning columns based on column names, rather than their positions.
other: The other DataFrame to be unioned with the current DataFrame.
allowMissingColumns: A boolean flag (False by default). If True, this allows the union of DataFrames even if one DataFrame has columns that are missing in the other. The missing columns in the other DataFrame will be filled with null values.
Key Points
Column Name Alignment: The method aligns columns by name, not by their position, which makes it flexible for combining DataFrames that have different column orders.
Handling Missing Columns: By default, if one DataFrame has columns that are missing in the other, PySpark will throw an error. However, setting allowMissingColumns=True allows unioning in such cases, and missing columns in one DataFrame will be filled with null values in the other.
Duplicate Rows: Just like union(), unionByName() does not remove duplicates, and the result includes all rows from both DataFrames.
unionAll() was an older method used to combine two DataFrames without removing duplicates. However, starting from PySpark 2.0, unionAll() has been deprecated and replaced by union(). The behavior of unionAll() is now identical to that of union() in PySpark.
look at union () in detail.
fillna (), df.na.fill()
fillna() is a method used to replace null (or missing) values in a DataFrame with a specified value. Return new DataFrame with null values replaced by the specified value.
Syntax
DataFrame.fillna(value, subset=None)
df.na.fill(value, subset=None)
df.na.fill(value, subset=None) has the result of df.fillna().
Parameters
value: The value to replace null with. It can be a scalar value (applied across all columns) or a dictionary (to specify column-wise replacement values). The type of value should match the type of the column you are applying it to (e.g., integers for integer columns, strings for string columns).
subset: Specifies the list of column names where the null values will be replaced. If not provided, the replacement is applied to all columns.
sample df
+-------+----+----+
| name| age|dept|
+-------+----+----+
| Alice| 25|null|
| Bob|null| HR|
|Charlie| 30|null|
+-------+----+----+
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- dept: string (nullable = true)
"age" is long, "dept" is string
# Fill null values with default values for all columns
df_filled = df.fillna(0)
==output==
+-------+---+----+
| name|age|dept|
+-------+---+----+
| Alice| 25|null|
| Bob| 0| HR|
|Charlie| 30|null|
+-------+---+----+
Bob's age is filled with 0, since it is "long", and "Dept" column did not fill, still remains "null".
Handle different Columns with different data type
# Fill nulls in the 'age' column with 0 and in the 'dept' column with "Unknown"
df_filled_columns = df.fillna({"age": 0, "dept": "Unknown"})
df_filled_columns.show()
==output==
+-------+---+-------+
| name|age| dept|
+-------+---+-------+
| Alice| 25|Unknown|
| Bob| 0| HR|
|Charlie| 30|Unknown|
Fill Nulls in Specific Subset of Columns
# Fill null values only in the 'age' column
df_filled_age = df.fillna(0, subset=["age"])
df_filled_age.show()
+-------+---+----+
| name|age|dept|
+-------+---+----+
| Alice| 25|null|
| Bob| 0| HR|
|Charlie| 30|null|
+-------+---+----+
select()
select() is used to project a subset of columns from a DataFrame or to create new columns based on expressions. It returns a new DataFrame containing only the selected columns or expressions.
distinct () is used to remove duplicate rows from a DataFrame or RDD, leaving only unique rows. It returns a new DataFrame that contains only unique rows from the original DataFrame.
dropDuplicates () is used to remove duplicate rows from a DataFrame based on one or more specific columns.
Syntax:
DataFrame.dropDuplicates([col1, col2, …, coln])
Parameters
cols (optional): This is a list of column names based on which you want to drop duplicates. If no column names are provided, dropDuplicates() will behave similarly to distinct(), removing duplicates across all columns.
# Drop duplicates across all columns (similar to distinct)
df_no_duplicates = df.dropDuplicates()
==output==
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
# Drop duplicates based on the "name" column
df_unique_names = df.dropDuplicates(["name"])
==output==
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
In the second case, only the first occurrence of each name is kept, while duplicates are removed, regardless of other columns.
orderBy(), sort ()
orderBy() or sort () method is used to sort the rows of a DataFrame based on one or more columns in ascending or descending order. It is equivalent to the SQL ORDER BY clause. The method returns a new DataFrame that is sorted based on the provided column(s).
In PySpark, both orderBy() and sort() methods are available, and they are essentially aliases of each other, with no functional difference. You can use either based on preference:
# Group by department and count the number of employees in each department
df_grouped = df.groupBy("department").count()
==output==
+----------+-----+
|department|count|
+----------+-----+
| Sales| 3|
| HR| 2|
+----------+-----+
# Group by multiple columns
original dataset
+-------+----------+------+
| name|department|gender|
+-------+----------+------+
| Alice| Sales|Female|
| Bob| Sales| Male|
|Charlie| HR| Male|
| David| HR| Male|
| Eve| Sales|Female|
+-------+----------+------+
# Group by department and gender, then count the number of employees in each group
df_grouped_multi = df.groupBy("department", "gender").count()
+----------+------+-----+
|department|gender|count|
+----------+------+-----+
| Sales|Female| 2|
| Sales| Male| 1|
| HR| Male| 2|
+----------+------+-----+
agg ()
agg() function in PySpark is used for performing aggregate operations on a DataFrame, such as computing sums, averages, counts, and other aggregations. it is often used in combination with groupBy() to perform group-wise aggregations.
Syntax:
from pyspark.sql.functions import sum, avg, count, max, min DataFrame.agg(*exprs)
This can be a string representing the data type (e.g., "int", "double", "string", etc.) or a PySpark DataType object (like IntegerType(), StringType(), FloatType(), etc.).
Common Data Types:
IntegerType(), "int": For integer values.
DoubleType(), "double": For double (floating-point) values.
FloatType(), "float": For floating-point numbers.
StringType(), "string": For text or string values.
DateType(), "date": For date values.
TimestampType(), "timestamp": For timestamps.
BooleanType(), "boolean": For boolean values (true/false).
from pyspark.sql.functions import col
# Cast a string column to integer
df1 = df.withColumn("age_int", col("age").cast("int"))
df1.printSchema()
==output==
root
|-- id: long (nullable = true)
|-- age: long (nullable = true)
|-- age_int: integer (nullable = true)
# Cast 'id' from long to string and 'age' from long to double
df_casted = df.withColumn("id", col("id").cast("int")) \
.withColumn("age", col("age").cast("double"))
df_casted.show()
df_casted.printSchema()
==output==
+---+----+
| id| age|
+---+----+
| 1|25.0|
| 2|12.0|
| 3|40.0|
+---+----+
root
|-- id: string (nullable = true)
|-- age: double (nullable = true)
filter (), where (),
filter () or where () function is used to filter rows from a DataFrame based on a condition or set of conditions. It works similarly to SQL’s WHERE clause,
df.filter(condition) df.where(condition)
Condition (for ‘filter’)
& (AND)
| (OR)
~ (NOT)
== (EQUAL)
all “filter” can change to “where”, vice versa.
sample dataframe
+------+---+-------+
| Name|Age|Salary|
+------+---+-------+
| Alice| 30| 50000|
| Bob| 25| 30000|
|Alicia| 40| 80000|
| Ann| 32| 35000|
+------+---+-------+
# Filter rows where age is greater than 30 AND salary is greater than 50000
df.filter((df["age"] > 30) & (df["salary"] > 50000))
df.where((df["age"] > 30) & (df["salary"] > 50000))
+------+---+------+
| Name|Age|Salary|
+------+---+------+
|Alicia| 40| 80000|
+------+---+------+
# Filter rows where age is less than 25 OR salary is less than 40000
df.filter((df["age"] < 25) | (df["salary"] < 40000))
df.where((df["age"] < 25) | (df["salary"] < 40000))
+----+---+------+
|Name|Age|Salary|
+----+---+------+
| Bob| 25| 30000|
| Ann| 32| 35000|
+----+---+------+
like ()
like() function is used to perform pattern matching on string columns, similar to the SQL LIKE operator
df.filter(df[“column_name”].like(“pattern”))
Pattern
%: Represents any sequence of characters.
_: Represents a single character.
pattern is case sensitive.
sample dataframe
+------+---+
| Name|Age|
+------+---+
| Alice| 30|
| Bob| 25|
|Alicia| 28|
| Ann| 32|
+------+---+
# Filtering names that start with 'Al'
df.filter(df["Name"].like("Al%")).show()
+------+---+
| Name|Age|
+------+---+
| Alice| 30|
|Alicia| 28|
+------+---+
# Filtering names that end with 'n'
df.filter(df["Name"].like("%n")).show()
+----+---+
|Name|Age|
+----+---+
| Ann| 32|
+----+---+
# Filtering names that contain 'li'
df.filter(df["Name"].like("%li%")).show()
+------+---+
| Name|Age|
+------+---+
| Alice| 30|
|Alicia| 28|
+------+---+
# Filtering names where the second letter is 'l'
df.filter(df["Name"].like("A_l%")).show()
+----+---+
|Name|Age|
+----+---+
+----+---+
nothing found in this pattern
It’s a “transformation”. withColumn() add or replace a column_name in a DataFrame. In other words, if “column_name” exists, replace/change the existing column, otherwise, add “column_name” as new column.
"column_name": The name of the new or existing column.
expression: Any transformation, calculation, or function applied to create or modify the column.
Basic Column Creation (with literal values)
from pyspark.sql.functions import lit # Add a column with a constant value df_new = df.withColumn(“New_Column”, lit(100)) ===output=== ID Name Grade New_Column 1 Alice null 100 2 Bob B 100 3 Charlie C 100
# Add a new column, no value df_new = df.withColumn(“New_Column”, lit(None)) ===output=== ID Name Grade New_Column 1 Alice null null 2 Bob B null 3 Charlie C null
Arithmetic Operation
from pyspark.sql.functions import col # Create a new column based on arithmetic operations df_arithmetic = df.withColumn(“New_ID”, col(“ID”) * 2 + 5) df_arithmetic.show() ===output=== ID Name Grade New_ID 1 Alice null 7 2 Bob B 9 3 Charlie C 11
Using SQL Function
you can use functions like concat(), substring(), when(), length(), etc.
from pyspark.sql.functions import concat, lit # Concatenate two columns with a separator df_concat = df.withColumn(“Full_Description”, concat(col(“Name”), lit(” has ID “), col(“ID”)))
Conditional Logic
Using when() and otherwise() is equivalent to SQL’s CASE WHEN expression.
from pyspark.sql.functions import when
# Add a new column with conditional logic
df_conditional = df.withColumn("Is_Adult", when(col("ID") > 18, "Yes").otherwise("No"))
df_conditional.show()
String Function
You can apply string functions like upper(), lower(), or substring()
from pyspark.sql.functions import upper # Convert a column to uppercase df_uppercase = df.withColumn(“Uppercase_Name”, upper(col(“Name”))) df_uppercase.show()
Type Casting
You can cast a column to a different data type.
Cast the ID column to a string
# Cast the ID column to a string df_cast = df.withColumn(“ID_as_string”, col(“ID”).cast(“string”)) df_cast.show()
Handling Null Values
create columns that handle null values using coalesce() or fill()
Coalesce:
This function returns the first non-null value among its arguments.
from pyspark.sql.functions import coalesce # Return non-null value between two columns df_coalesce = df.withColumn(“NonNullValue”, coalesce(col(“Name”), lit(“Unknown”))) df_coalesce.show()
Fill Missing Values:
# Replace nulls in a column with a default value df_fill = df.na.fill({“Name”: “Unknown”}) df_fill.show()
Creating Columns with Complex Expressions
create columns based on more complex expressions or multiple transformations at once.
# Create a column with multiple transformations df_complex = df.withColumn( “Complex_Column”, concat(upper(col(“Name”)), lit(“_”), col(“ID”).cast(“string”)) ) df_complex.show(truncate=False)
example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, when, upper, coalesce
# Initialize Spark session
spark = SparkSession.builder.appName("withColumnExample").getOrCreate()
# Create a sample DataFrame
data = [(1, "Alice", None), (2, "Bob", "B"), (3, "Charlie", "C")]
df = spark.createDataFrame(data, ["ID", "Name", "Grade"])
# Perform various transformations using withColumn()
df_transformed = df.withColumn("ID_Multiplied", col("ID") * 10) \
.withColumn("Full_Description", concat(upper(col("Name")), lit(" - ID: "), col("ID"))) \
.withColumn("Pass_Status", when(col("Grade") == "C", "Pass").otherwise("Fail")) \
.withColumn("Non_Null_Grade", coalesce(col("Grade"), lit("N/A"))) \
.withColumn("ID_as_String", col("ID").cast("string"))
# Show the result
df_transformed.show(truncate=False)
==output==
ID Name Grade ID_Multiplied Full_Description Pass_Status Non_Null_Grade ID_as_String
1 Alice null 10 ALICE - ID: 1 Fail N/A 1
2 Bob B 20 BOB - ID: 2 Fail B 2
3 Charlie C 30 CHARLIE - ID: 3 Pass C 3
select ()
select () is used to project (select) a set of columns or expressions from a DataFrame. This function allows you to choose and work with specific columns, create new columns, or apply transformations to the data.
Syntax
DataFrame.select(*cols)
Commonly Used PySpark Functions with select()
col(column_name): Refers to a column.
alias(new_name): Assigns a new name to a column.
lit(value): Adds a literal value.
round(column, decimals): Rounds off the values in a column.
Renaming Columns:
df.select(df["column1"].alias("new_column1"), df["column2"]).show()
Using Expressions:
from pyspark.sql import functions as F
df.select(F.col("column1"), F.lit("constant_value"), (F.col("column2") + 10).alias("modified_column2")).show()
Performing Calculations
df.select((df["column1"] * 2).alias("double_column1"), F.round(df["column2"], 2).alias("rounded_column2")).show()
Handling Complex Data Types (Struct, Array, Map):
df.select("struct_column.field_name").show()
Selecting with Wildcards:
While PySpark doesn’t support SQL-like wildcards directly, you can achieve similar functionality using selectExpr (discussed below) or other methods like looping over df.columns.
df.select([c for c in df.columns if “some_pattern” in c]).show()
Using selectExpr ()
df.selectExpr(“column1”, “column2 + 10 as new_column2”).show()
df = spark.read \
.format("parquet") \
.option("mergeSchema", "true") \ # Merges schemas of all files (useful when reading from multiple files with different schemas)
.option("pathGlobFilter", "*.parquet") \ # Read only specific files based on file name patterns
.option("recursiveFileLookup", "true") \ # Recursively read files from directories and subdirectories
.load("/path/to/parquet/file/or/directory")
Options
mergeSchema: When reading Parquet files with different schemas, merge them into a single schema.
true (default: false)
pathGlobFilter: Allows specifying a file pattern to filter which files to read (e.g., “*.parquet”).
recursiveFileLookup: Reads files recursively from subdirectories.
true (default: false)
modifiedBefore/modifiedAfter: Filter files by modification time. For example: .option(“modifiedBefore”, “2023-10-01T12:00:00”) .option(“modifiedAfter”, “2023-01-01T12:00:00”)
maxFilesPerTrigger: Limits the number of files processed in a single trigger, useful for streaming jobs.
schema: Provides the schema of the Parquet file (useful when reading files without inferring schema).
Programmatically pass a list of file paths. These options help streamline your data ingestion process when dealing with multiple Parquet files in Databricks.
Write to parquet
Syntax
# Writing a Parquet file
df.write \
.format("parquet") \
.mode("overwrite") \ # Options: "overwrite", "append", "ignore", "error"
.option("compression", "snappy") \ # Compression options: none, snappy, gzip, lzo, brotli, etc.
.option("maxRecordsPerFile", 100000) \ # Max number of records per file
.option("path", "/path/to/output/directory") \
.partitionBy("year", "month") \ # Partition the output by specific columns
.save()
Options
compression: .option(“compression”, “snappy”)
Specifies the compression codec to use when writing files. Options: none, snappy (default), gzip, lzo, brotli, lz4, zstd, etc.
In Apache Spark, RDD, DataFrame, and Dataset are the core data structures used to handle distributed data. Each of these offers different levels of abstraction and optimization.
Basic Overview
Feature
RDD
DataFrame
Dataset
Definition
Low-level abstraction for distributed data (objects). RDDs (Resilient Distributed Datasets) are the fundamental data structure in Spark, representing an immutable distributed collection of objects. They provide a low-level interface to Spark, allowing developers to perform operations directly on data in a functional programming style.
High-level abstraction for structured data (table). DataFrames are similar to tables in relational databases and are used to represent structured data. They allow you to perform complex queries and operations on structured datasets with a SQL-like syntax.
Combines RDD and DataFrame, adds type safety (Scala/Java). Datasets are an extension of DataFrames that provide the benefits of both RDDs and DataFrames, with compile-time type safety. They are particularly useful in Scala and Java for ensuring data types during compile time.
API
Functional (map, filter, etc.) The RDD API provides functional programming constructs, allowing transformations and actions through functions like map, filter, and reduce.
SQL-like API + relational functions. The DataFrame API includes both SQL-like queries and relational functions, making it more user-friendly and easier to work with structured data.
Typed API + relational functions (strong typing in Scala/Java). The Dataset API combines typed operations with the ability to perform relational functions, allowing for a more expressive and type-safe programming model in Scala/Java.
Data Structure
Collection of elements (e.g., objects, arrays). RDDs are essentially collections of objects, which can be of any type (primitive or complex). This means that users can work with various data types without a predefined schema.
Distributed table with named columns. DataFrames represent structured data as a distributed table, where each column has a name and a type. This structured format makes it easier to work with large datasets and perform operations.
Typed distributed table with named columns. Datasets also represent data in a structured format, but they enforce types at compile time, enhancing safety and performance, especially in statically typed languages like Scala and Java.
Schema
No schema
Defined schema (column names)
Schema with compile-time type checking (Scala/Java)
Performance
Less optimized (no Catalyst/Tungsten)
Highly optimized (Catalyst/Tungsten)
Highly optimized (Catalyst/Tungsten)
Transformations and Actions
Transformations and Actions are two fundamental operations used to manipulate distributed data collections like RDDs, DataFrames, and Datasets.
High-Level Differences
Transformations: These are lazy operations that define a new RDD, DataFrame, or Dataset by applying a function to an existing one. However, they do not immediately execute—Spark builds a DAG (Directed Acyclic Graph) of all transformations.
Actions: These are eager operations that trigger execution by forcing Spark to compute and return results or perform output operations. Actions break the laziness and execute the transformations.
Key Differences Between Transformations and Actions
Feature
Transformations
Actions
Definition
Operations that define a new dataset based on an existing one, but do not immediately execute.
Operations that trigger the execution of transformations and return results or perform output.
Lazy Evaluation
Yes, transformations are lazily evaluated and only executed when an action is called.
No, actions are eager and immediately compute the result by triggering the entire execution plan.
Execution Trigger
Do not trigger computation immediately. Spark builds a DAG of transformations to optimize execution.
Trigger the execution of the transformations and cause Spark to run jobs and return/output data.
Return Type
Return a new RDD, DataFrame, or Dataset (these are still “recipes” and not materialized).
Return a result to the driver program (like a value) or write data to an external storage system.