PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.
Visual Summary of Categories
Category | Functions |
---|---|
Basic Functions | alias, cast, lit, col, when, isnull, isnan |
String Functions | concat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap |
Date and Time Functions | current_date, datediff, to_date, year, hour, unix_timestamp, date_format |
Mathematical Functions | abs, round, floor, sqrt, pow, exp, log, sin, cos, rand |
Aggregation Functions | count, sum, avg, min, max, stddev, collect_list |
Array and Map Functions | array, size, array_contains, explode, map_keys, map_values |
Null Handling Functions | isnull, na.fill, na.drop, na.replace |
Window Functions | row_number, rank, ntile, lag, lead, cume_dist, percent_rank |
Statistical Functions | corr, covar_samp, approx_count_distinct, percentile_approx |
UDF and Advanced Functions | udf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json |
sample DataFrames
Sample dataframe
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat| cut|color|clarity|depth|table|price| x| y| z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 1| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43|
| 2| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31|
| 3| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31|
| 4| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63|
| 5| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75|
| 6| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48|
| 7| 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime |
+---+-----+-----------------------+
|1 |F |2024-12-03 00:46:02.165|
|2 |E |2024-12-03 00:46:02.165|
|3 |D |2024-12-03 00:46:02.165|
|4 |G |2024-12-03 00:46:02.165|
|6 |J |2024-12-03 00:46:02.165|
|5 |I |2024-12-03 00:46:02.165|
|7 |H |2024-12-03 00:46:02.165|
|8 |K |2024-12-03 00:46:02.165|
|9 |L |2024-12-03 00:46:02.165|
+---+-----+-----------------------+
PySpark datetime related functions
PySpark provides a rich set of functions in the pyspark.sql.functions
module to manipulate and analyze datetime columns.
date time Formatting
Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second
from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
.show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime |formatted_date|
+---+-----+-----------------------+--------------+
|1 |F |2024-12-03 00:46:02.165|2024-12-03 |
|2 |E |2024-12-03 00:46:02.165|2024-12-03 |
+---+-----+-----------------------+--------------+
Converting Between Types
to_date(
column, format)
: Converts a string to a date.unix_timestamp(
column, format)
: Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp
dfc.withColumn("date_only", to_date("current_date")) \
.withColumn("unix_time", unix_timestamp("current_date"))\ .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06 |2024-12-06|1733443200|
|2024-12-06 |2024-12-06|1733443200|
to_timestamp(
column, format)
: Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.from_unixtime(
unix_time, format)
: Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit
df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
| string|
+----------+
|2024-12-05|
+----------+
df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
| string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+
df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
| string|unixTimeStamp|
+----------+-------------+
|2024-12-05| 1733343200|
+----------+-------------+
df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
| 1733343200| 2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
- date_add ()
- date_sub ()
- add_month ()
df.select(col("input"),
add_months(col("input"),3).alias("add_months"),
add_months(col("input"),-3).alias("sub_months"),
date_add(col("input"),4).alias("date_add"),
date_sub(col("input"),4).alias("date_sub")
).show()
+----------+----------+----------+----------+----------+
| input|add_months|sub_months| date_add| date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )
PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.
from pyspark.sql.functions import col, current_date, datediff
df2 = df.select(
col("date"),
current_date().alias("current_date"),
datediff(current_date(),col("date")).alias("datediff")
)
+----------+------------+--------+
| date|current_date|datediff|
+----------+------------+--------+
|2019-07-01| 2024-12-06| 1985|
|2019-06-24| 2024-12-06| 1992|
|2019-08-24| 2024-12-06| 1931|
+----------+------------+--------+
months_between ( )
PySpark SQL months_between() function to get the number of months between two dates
from pyspark.sql.functions import col, current_date, datediff, months_between, round
df3 = df.withColumn("today", current_date())\
.withColumn("monthsDiff", months_between(current_date(), col("date"))) \
.withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id| date| today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
| 1|2019-07-01|2024-12-06|65.16129032| 65.16|
| 2|2019-06-24|2024-12-06|65.41935484| 65.42|
| 3|2019-08-24|2024-12-06|63.41935484| 63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years
utilize the months_between() function to get the difference in months and then convert it into years.
from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit
df4 = df.withColumn("today", current_date()) \
.withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
.withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))
+---+----------+----------+-----------------+---------------+
| id| date| today| yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
| 1|2019-07-01|2024-12-06|5.430107526666667| 5.43|
| 2|2019-06-24|2024-12-06|5.451612903333333| 5.45|
| 3|2019-08-24|2024-12-06|5.284946236666666| 5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two
Calculates the difference between two
trunc ( )
trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01
df.select(col("input"),
trunc(col("input"),"Year").alias("Month_Year"),
trunc(col("input"),"Month").alias("Month_Trunc")
).show()
+----------+----------+-----------+
| input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval
interval Used for advanced time calculations (not directly available but works with PySpark SQL).
Extracting Components from a Datetime
- year(column): Extracts the year.
- quarter(column): Returns the quarter as an integer from a given date or timestamp.
- dayofyear(column): Extracts the day of the year from a given date or timestamp.
- dayofmonth(column): Extracts the day of the month.
- dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
- weekofyear(column): Returns the week number of the year.
- last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
- next_day (column, day_of_week) e.g. Mon, Sunday
- hour(column): Extracts the hour.
- minute(column): Extracts the minute.
- second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second
df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1 |1 |2 |46 |2 |
|2023-01-11 15:35:32.265|2023|1 |1 |15 |35 |32 |
|2022-09-17 22:16:02.186|2022|3 |9 |22 |16 |2 |
+-----------------------+----+-------+-----+----+------+------+
from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format
df.select(date_format("input","yyy-MM-dd").alias("input"),
dayofweek("input").alias('dayofweek'),
dayofmonth("input").alias('dayofmonth'),
weekofyear("input").alias("weekofyear") ,
next_day("input","mon").alias("nextday"),
last_day("input").alias('lastday')
).show()
+----------+---------+----------+----------+----------+----------+
| input|dayofweek|dayofmonth|weekofyear| nextday| lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01| 2| 1| 1|2024-01-08|2024-01-31|
|2023-01-11| 4| 11| 2|2023-01-16|2023-01-31|
|2022-09-17| 7| 17| 37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
- current_date (),
- current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp
dfc.withColumn ("current_date", current_date())\
.withColumn ("current_timestamp", current_timestamp())\
.select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp |
+------------+-----------------------+
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
|2024-12-06 |2024-12-06 14:33:24.419|
+------------+-----------------------+
PySpark string related functions
btrim (str[, trim] ), trim ( )
btrim(str[, trim]) Trim characters at the beginning and end of the string ‘str’ are removed.
- trim (): Removes only whitespace from the beginning and end of a string.
when you only need to clean up whitespace from strings. - btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
| text |
+-------------+
| hello |
| !!spark!! |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
| text| trimmed|
+----------+---------+
| hello |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
| text |trimmed_custom|
+-----------+--------------+
| hello |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()
concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.
- concat ( ) : No delimiter is added between the concatenated values.
when you need strict concatenation without any delimiters. - concat_ws (): with a specified delimiter between the values.
when you need a delimiter or want to ignoreNULL
values.
from pyspark.sql.functions import concat, concat_ws, lit
df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
| John| Doe| John Doe|
| Alice| null| null|
| Bob| Smith| Bob Smith|
+----------+---------+----------+
df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
| John| Doe| John Doe|
| Alice| null| Alice|
| Bob| Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )
endswith() Returns a boolean.
+--------------+
| text|
+--------------+
| hello world|
|PySpark is fun|
| welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
| text|end_with?|
+--------------+---------+
| hello world| false|
|PySpark is fun| true|
| welcome| false|
+--------------+---------+
df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
| text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )
contains () check whether a PySpark DataFrame column contains a specific string or not,
+--------------+
| full_name|
+--------------+
| John Doe|
| Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
| full_name|contain?|
+--------------+--------+
| John Doe| false|
| Jane Smith| true|
|Robert Johnson| false|
+--------------+--------+
df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )
find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).
length ( )
length ( ) Provides the length of characters for string data or the number of bytes for binary data.
from pyspark.sql.functions import length
df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
| text|char_length|
+----------+-----------+
| hello| 5|
| PySpark| 7|
|Databricks| 10|
+----------+-----------+
like ( )
like ( ) use && and || operators to have multiple conditions in Scala.
+--------------+
| full_name|
+--------------+
| John Doe|
| Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
| name| R%| %th|%John%|
+--------------+-----+-----+------+
| John Doe|false|false| true|
| Jane Smith|false| true| false|
|Robert Johnson| true|false| true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
| name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Michael | Rose| |40288| M| 4000|
| James | | Smith|36636| M| 3000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael | Rose| |40288| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )
substring (str, pos[, len])
: Returns the substring of str that starts at pos and is of length len
+----------+-----+---+------+
| fname|lname| id|gender|
+----------+-----+---+------+
| James| Bond|100| null|
| Ann|Varsa|200| F|
|Tom Cruise| XXX|400| |
| Tom Brand| null|400| M|
+----------+-----+---+------+
from pyspark.sql.functions import substring
df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
| Ja| Ja|
| An| An|
| To| To|
| To| To|
+------+---------+
split ( )
PySpark – split(), Splitting a column into multiple columns
from pyspark.sql.functions import split
df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
| full_name| split_names|
+--------------+-----------------+
| John,Doe| [John, Doe]|
| Jane,Smith| [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+
split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
| full_name|first_name|last_name|
+--------------+----------+---------+
| John,Doe| John| Doe|
| Jane,Smith| Jane| Smith|
|Robert,Johnson| Robert| Johnson|
+--------------+----------+---------+
df_expanded = df_with_split.select(
"full_name",
df_with_split["split_names"].getItem(0).alias("first_name"),
df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
| full_name|first_name|last_name|
+--------------+----------+---------+
| John,Doe| John| Doe|
| Jane,Smith| Jane| Smith|
|Robert,Johnson| Robert| Johnson|
+--------------+----------+---------+
translate ( )
translate() string function can replace character by character of DataFrame column value.
from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color| new_color| replaced|
+-----+----------------+----------------+
| F|almost colorless|A_N0st c0_0r_ess|
| E| null| null|
| D| colorless| c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )
PySpark – regexp_replace() replace a column value with a string for another string/substring
+---+------------------+-----+
| id| address|state|
+---+------------------+-----+
| 1| 14851 Jeffrey Rd| DE|
| 2|43421 Margarita St| NY|
| 3| 13111 Siemon Ave| CA|
+---+------------------+-----+
from pyspark.sql.functions import regexp_replace
df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
.show(truncate=False)
+---+------------------+-----+
|id |address |state|
+---+------------------+-----+
|1 |14851 Jeffrey Road|DE |
|2 |43421 Margarita St|NY |
|3 |13111 Siemon Ave |CA |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address',
when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
.when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
.when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
.otherwise(df.address)) \
.show(truncate=False
+---+----------------------+-----+
|id |address |state|
+---+----------------------+-----+
|1 |14851 Jeffrey Road |DE |
|2 |43421 Margarita Street|NY |
|3 |13111 Siemon Avenue |CA |
+---+----------------------+-----+
overlay
PySpark – overlay()
+---------------+----+
| col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay
df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
| overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
- upper: Converts all characters in the column to uppercase.
- lower: Converts all characters in the column to lowercase.
- initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
| text|
+-----------------+
| hello world|
|spark sql example|
| UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap
df.withColumn("Uppercase", upper("text"))\
.withColumn("lowercase", lower("text"))\
.withColumn("Capitalized", initcap("text"))\
.show()
+-----------------+-----------------+-----------------+-----------------+
| text| Uppercase| lowercase| Capitalized|
+-----------------+-----------------+-----------------+-----------------+
| hello world| HELLO WORLD| hello world| Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
| UPPER and LOWER| UPPER AND LOWER| upper and lower| Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+
Numeric Functions
Mathematical operations on numeric columns.
abs()
abs(): Absolute value.
from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )
round(): Round to a specific number of decimals.
from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )
pow()
: Power function.
from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))
Aggregate Functions
sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|James |Sales |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
|Saif |Sales |4100 |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
- approx_count_distinct (): returns the count of distinct items in a group
- countdistinct: returns the count of distinct items in a group
- distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct
df.select(approx_count_distinct("salary").alias("approx_count_distinct"),
countDistinct("salary").alias("countDistinct"),
).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
| 6| 6|
+---------------------+-------------+
df.select("salary").distinct().show()
+------+
|salary|
+------+
| 3000|
| 4600|
| 4100|
| 3300|
| 3900|
| 2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
- avg ( ): average of values in the input column
- sum ( )
- sumDistinct ( ): returns the sum of all distinct values in a column.
- max ( )
- min ( )
- mean ( )
from pyspark.sql.functions import avg,sum, max, min
df.select(avg("salary").alias("avg"),
sum("salary").alias("sum"),
max("salary").alias("max"),
min("salary").alias("min")
).show()
+------+-----+----+----+
| avg| sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900 |
+--------------------+
from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
| 1| 10|
| 2| 20|
| 3| 30|
| 4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
| 20.0|
+----------+
first (), last ()
- first
()
returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element. - last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last
df.select(first("salary").alias("first"),
last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )
PySpark – collect_list() returns all values from an input column with duplicates.
from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary) |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )
PySpark – collect_set() returns all values from an input column without duplicates.
from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary) |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+
PySpark Window functions
PySpark’s Window Ranking functions, like row_number()
, rank()
, and dense_rank()
, assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.
- row_number() assigns unique sequential numbers,
- rank() provides the ranking with gaps,
- dense_rank() offers ranking without gaps.
row_number ()
row_number() window function gives the sequential row number starting from 1 to the result of each window partition.
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
windowSpec = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
.where(col("row_number") < 4).show()
+---+---------+-----+----------+
|_c0| cut|color|row_number|
+---+---------+-----+----------+
|677| Fair| D| 1|
|772| Fair| D| 2|
|940| Fair| D| 3|
| 43| Good| D| 1|
| 44| Good| D| 2|
|239| Good| D| 3|
| 63| Ideal| D| 1|
| 64| Ideal| D| 2|
|121| Ideal| D| 3|
| 55| Premium| D| 1|
| 62| Premium| D| 2|
|151| Premium| D| 3|
| 29|Very Good| D| 1|
| 35|Very Good| D| 2|
| 39|Very Good| D| 3|
+---+---------+-----+----------+
rank ()
rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.
from pyspark.sql.functions import rank
from pyspark.sql.window import Window
windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()
+-----+---------+-----+-----+----+
| _c0| cut|color|price|rank|
+-----+---------+-----+-----+----+
| 29|Very Good| D| 357| 1|
|28262|Very Good| D| 357| 1|
|28272| Good| D| 361| 3|
|28273|Very Good| D| 362| 4|
|28288|Very Good| D| 367| 5|
|31598| Ideal| D| 367| 5|
|31601| Premium| D| 367| 5|
|31602| Premium| D| 367| 5|
|31618|Very Good| D| 373| 9|
|34922|Very Good| D| 373| 9|
|38277| Premium| D| 386| 11|
|38278| Premium| D| 386| 11|
|38279| Premium| D| 386| 11|
|38280| Premium| D| 386| 11|
|41581|Very Good| D| 388| 15|
|41582|Very Good| D| 388| 15|
dense_rank ()
dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
| _c0|color|price|dense_rank|
+-----+-----+-----+----------+
| 29| D| 357| 1|
|28262| D| 357| 1|
|28272| D| 361| 2|
|28273| D| 362| 3|
|28288| D| 367| 4|
|31598| D| 367| 4|
|31601| D| 367| 4|
|31602| D| 367| 4|
|31618| D| 373| 5|
|34922| D| 373| 5|
|38277| D| 386| 6|
|38278| D| 386| 6|
|38279| D| 386| 6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
.select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0 |color|price|percent_rank |
+-----+-----+-----+---------------------+
|29 |D |357 |0.0 |
|28262|D |357 |0.0 |
|28272|D |361 |2.952465308532625E-4 |
|28273|D |362 |4.428697962798937E-4 |
|28288|D |367 |5.90493061706525E-4 |
|31598|D |367 |5.90493061706525E-4 |
|31601|D |367 |5.90493061706525E-4 |
|31602|D |367 |5.90493061706525E-4 |
|31618|D |373 |0.00118098612341305 |
|34922|D |373 |0.00118098612341305 |
|38277|D |386 |0.0014762326542663124|
|38278|D |386 |0.0014762326542663124|
lag (), lead ( )
- lag ( ) function allows you to access a previous row’s value within the partition based on a specified offset.
- lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
.withColumn("lead",lead("salary",2).over(windowSpec))\
.show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
| Maria| Finance| 3000|null|3900|
| Scott| Finance| 3300|null|null|
| Jen| Finance| 3900|3000|null|
| Kumar| Marketing| 2000|null|null|
| Jeff| Marketing| 3000|null|null|
| James| Sales| 3000|null|4100|
| James| Sales| 3000|null|4100|
| Robert| Sales| 4100|3000|4600|
| Saif| Sales| 4100|3000|null|
| Michael| Sales| 4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )
ntile ( ) returns the relative rank of result rows within a window partition
from pyspark.sql.functions import ntile
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
.show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
| Maria| Finance| 3000| 1|
| Scott| Finance| 3300| 1|
| Jen| Finance| 3900| 2|
| Kumar| Marketing| 2000| 1|
| Jeff| Marketing| 3000| 2|
| James| Sales| 3000| 1|
| James| Sales| 3000| 1|
| Robert| Sales| 4100| 1|
| Saif| Sales| 4100| 2|
| Michael| Sales| 4600| 2|
+-------------+----------+------+-----+
PySpark json related functions
sample data
+---+--------------------------------------------------------------------------+
|id |value |
+---+--------------------------------------------------------------------------+
|1 |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )
The explode()
function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.
from pyspark.sql.functions import explode
explode(col)
col
: The name of the column or an expression containing an array or map to be exploded.
Return a new row for each element in the array or each key-value pair in the map.
# Usage with Arrays:
# Sample data
data = [
(1, ["a", "b", "c"]),
(2, ["d", "e"]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
| 1| a|
| 1| b|
| 1| c|
| 2| d|
| 2| e|
+---+------+
# Usage with Maps:
# Sample data
data = [
(1, {"key1": "value1", "key2": "value2"}),
(2, {"key3": "value3"}),
(3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])
# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key| value|
+---+----+-------+
| 1|key1| value1|
| 1|key2| value2|
| 2|key3| value3|
+---+----+-------+
work with json
Exploding a JSON Array
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType
# Sample JSON data
data = [
('{"id": 1, "values": ["a", "b", "c"]}',),
('{"id": 2, "values": ["d", "e"]}',),
('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]} |
|{"id": 3, "values": []} |
+------------------------------------+
# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*") # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values |
+---+---------+
|1 |[a, b, c]|
|2 |[d, e] |
|3 |[] |
+---+---------+
# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
| 1| a|
| 1| b|
| 1| c|
| 2| d|
| 2| e|
+---+-----+
Exploding a JSON Map
from pyspark.sql.types import MapType, StringType
# Sample JSON data
data = [
('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
('{"id": 2, "properties": {"key3": "value3"}}',),
('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}} |
|{"id": 3, "properties": {}} |
+-------------------------------------------------------------+
# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"
# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*") # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties |
+---+--------------------------------+
|1 |{key1 -> value1, key2 -> value2}|
|2 |{key3 -> value3} |
|3 |{} |
+---+--------------------------------+
# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
| 1|key1|value1|
| 1|key2|value2|
| 2|key3|value3|
+---+----+------+
- Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the
explode
operation. - Complex JSON Structures: For deeply nested JSON structures, use nested
from_json
andexplode
calls as needed.
from_json ( )
from_json ( ): Converts JSON string into Struct type or Map type.
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
|-- id: long (nullable = true)
|-- value: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+---+---------------------------------------------------------------------------+
|id |value |
+---+---------------------------------------------------------------------------+
|1 |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object
get_json_object () Extracts JSON element from a JSON string based on json path specified.
from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
.show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1 |STANDARD |
+---+-----------+
json_tuple ( )
json_tuple() Extract the Data from JSON and create them as a new columns.
from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
.toDF("id","Zipcode","ZipCodeType","City") \
.show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City |
+---+-------+-----------+-----------+
|1 |704 |STANDARD |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )
to_json () is used to convert DataFrame columns MapType or Struct type to JSON string
from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
.show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value |
+---+----------------------------------------------------------------------------+
|1 |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )
schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.
pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) → Column
from pyspark.sql.functions import schema_of_json, col
# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'),
("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]
df = spark.createDataFrame(data, columns)
# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]
struct<name:string,age:int,skills:array<string>>
PySpark expr()
is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.
expr
()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
| name| gender|
+-------+-------+
| James| Male|
|Michael| Female|
| Jen|unknown|
+-------+-------+
#Add Month value from another column
df.select(df.date,df.increment,
expr("add_months(date,increment)")
.alias("inc_date")).show()
+----------+---------+----------+
| date|increment| inc_date|
+----------+---------+----------+
|2019-01-23| 1|2019-02-23|
|2019-06-24| 2|2019-08-24|
|2019-09-20| 3|2019-12-20|
+----------+---------+----------+
PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,
lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
| 111| 50000| 1|
| 222| 60000| 1|
| 333| 40000| 1|
+-----+------+----------+
typedLit ()
PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame
df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)
Difference between lit()
and typedLit()
is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()
Stack ( )
Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.
Syntax
stack(n: Int, exprs: String*): Column
Parameters
n
: The number of rows to create per input row. Each set ofn
expressions inexprs
corresponds to a new row.exprs
: A sequence of column-value pairs, typically specified as strings in the format"column_name, column_value"
.
+---+---+---+---+
| id| A| B| C|
+---+---+---+---+
| 1|100|200|300|
| 2|400|500|600|
+---+---+---+---+
# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
"id",
"stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
| 1| A| 100|
| 1| B| 200|
| 1| C| 300|
| 2| A| 400|
| 2| B| 500|
| 2| C| 600|
+---+--------+-----+
The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.
StructType & StructField
Simple STructType and StructField
# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark . createDataFrame(data=data, schema=schema)
Nested StructType object struct
# Defining schema using nested StructType
structureData = [
(("James","","Smith"),"36636","M",3100),
(("Michael","Rose",""),"40288","M",4300),
(("Robert","","Williams"),"42114","M",1400),
(("Maria","Anne","Jones"),"39192","F",5500),
(("Jen","Mary","Brown"),"","F",-1)
]
structureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', IntegerType(), True)
])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
When ()
When () is similar to SQL and programming languages.
from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
when(dfc.color == "F", "almost colorless")
. when(dfc.color == "D", "colorless")
. when(dfc.color == "G", "indistinguishable colorless")
. when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
.otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color |
+-----+---------------------------+
|F |almost colorless |
|E |very colorful |
|D |colorless |
|G |indistinguishable colorless|
|J |almost colorless |
|I |almost colorless |
|H |very colorful |
|K |very colorful |
|L |very colorful |
+-----+---------------------------+
df3 = df.withColumn("new_gender", expr(
"CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female'
WHEN gender IS NULL THEN ''" +
"ELSE gender END"))
df.createOrReplaceTempView("EMP")
spark.sql("select name,
CASE WHEN gender = 'M' THEN 'Male' " +
"WHEN gender = 'F' THEN 'Female'
WHEN gender IS NULL THEN ''" +
"ELSE gender END as new_gender from EMP").show()
attention: above 2 example segments plus one by one
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
(remove all space from the email account 😊)