PySpark Built-in Functions

PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.

Visual Summary of Categories

CategoryFunctions
Basic Functionsalias, cast, lit, col, when, isnull, isnan
String Functionsconcat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap
Date and Time Functionscurrent_date, datediff, to_date, year, hour, unix_timestamp, date_format
Mathematical Functionsabs, round, floor, sqrt, pow, exp, log, sin, cos, rand
Aggregation Functionscount, sum, avg, min, max, stddev, collect_list
Array and Map Functionsarray, size, array_contains, explode, map_keys, map_values
Null Handling Functionsisnull, na.fill, na.drop, na.replace
Window Functionsrow_number, rank, ntile, lag, lead, cume_dist, percent_rank
Statistical Functionscorr, covar_samp, approx_count_distinct, percentile_approx
UDF and Advanced Functionsudf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json
sample DataFrames
Sample dataframe 
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime       |
+---+-----+-----------------------+
|1  |F    |2024-12-03 00:46:02.165|
|2  |E    |2024-12-03 00:46:02.165|
|3  |D    |2024-12-03 00:46:02.165|
|4  |G    |2024-12-03 00:46:02.165|
|6  |J    |2024-12-03 00:46:02.165|
|5  |I    |2024-12-03 00:46:02.165|
|7  |H    |2024-12-03 00:46:02.165|
|8  |K    |2024-12-03 00:46:02.165|
|9  |L    |2024-12-03 00:46:02.165|
+---+-----+-----------------------+

PySpark datetime related functions

PySpark provides a rich set of functions in the pyspark.sql.functions module to manipulate and analyze datetime columns.

date time Formatting

Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second

from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
          .show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime       |formatted_date|
+---+-----+-----------------------+--------------+
|1  |F    |2024-12-03 00:46:02.165|2024-12-03    |
|2  |E    |2024-12-03 00:46:02.165|2024-12-03    |
+---+-----+-----------------------+--------------+
Converting Between Types
  • to_date(column, format): Converts a string to a date.
  • unix_timestamp(column, format): Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp

dfc.withColumn("date_only", to_date("current_date")) \
    .withColumn("unix_time", unix_timestamp("current_date"))\          .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06    |2024-12-06|1733443200|
|2024-12-06    |2024-12-06|1733443200|
  • to_timestamp(column, format): Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.
  • from_unixtime(unix_time, format): Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit

df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
|    string|
+----------+
|2024-12-05|
+----------+

df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
|    string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+

df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
|    string|unixTimeStamp|
+----------+-------------+
|2024-12-05|   1733343200|
+----------+-------------+

df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
|   1733343200|                              2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
  • date_add ()
  • date_sub ()
  • add_month ()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()
+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )

PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.

from pyspark.sql.functions import col, current_date, datediff

df2 = df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    )
+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2024-12-06|    1985|
|2019-06-24|  2024-12-06|    1992|
|2019-08-24|  2024-12-06|    1931|
+----------+------------+--------+
months_between ( )

PySpark SQL months_between() function to get the number of months between two dates

from pyspark.sql.functions import col, current_date, datediff, months_between, round

df3 = df.withColumn("today", current_date())\
    .withColumn("monthsDiff", months_between(current_date(), col("date"))) \
    .withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id|      date|     today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
|  1|2019-07-01|2024-12-06|65.16129032|           65.16|
|  2|2019-06-24|2024-12-06|65.41935484|           65.42|
|  3|2019-08-24|2024-12-06|63.41935484|           63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years

utilize the months_between() function to get the difference in months and then convert it into years.

from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit

df4 = df.withColumn("today", current_date()) \
  .withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
  .withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))

+---+----------+----------+-----------------+---------------+
| id|      date|     today|        yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
|  1|2019-07-01|2024-12-06|5.430107526666667|           5.43|
|  2|2019-06-24|2024-12-06|5.451612903333333|           5.45|
|  3|2019-08-24|2024-12-06|5.284946236666666|           5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two

Calculates the difference between two

trunc ( )

trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01

df.select(col("input"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()
+----------+----------+-----------+
|     input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval

interval Used for advanced time calculations (not directly available but works with PySpark SQL).


Extracting Components from a Datetime
  • year(column): Extracts the year.
  • quarter(column): Returns the quarter as an integer from a given date or timestamp.
  • dayofyear(column): Extracts the day of the year from a given date or timestamp.
  • dayofmonth(column): Extracts the day of the month.
  • dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
  • weekofyear(column): Returns the week number of the year.
  • last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
  • next_day (column, day_of_week) e.g. Mon, Sunday
  • hour(column): Extracts the hour.
  • minute(column): Extracts the minute.
  • second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second

df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input                  |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1      |1    |2   |46    |2     |
|2023-01-11 15:35:32.265|2023|1      |1    |15  |35    |32    |
|2022-09-17 22:16:02.186|2022|3      |9    |22  |16    |2     |
+-----------------------+----+-------+-----+----+------+------+

from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format

df.select(date_format("input","yyy-MM-dd").alias("input"), 
    dayofweek("input").alias('dayofweek'), 
     dayofmonth("input").alias('dayofmonth'),
     weekofyear("input").alias("weekofyear") ,
     next_day("input","mon").alias("nextday"),
     last_day("input").alias('lastday')
  ).show()
+----------+---------+----------+----------+----------+----------+
|     input|dayofweek|dayofmonth|weekofyear|   nextday|   lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01|        2|         1|         1|2024-01-08|2024-01-31|
|2023-01-11|        4|        11|         2|2023-01-16|2023-01-31|
|2022-09-17|        7|        17|        37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
  • current_date (),
  • current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp

dfc.withColumn ("current_date", current_date())\
    .withColumn ("current_timestamp", current_timestamp())\
    .select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp      |
+------------+-----------------------+
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
+------------+-----------------------+

PySpark string related functions

btrim (str[, trim] ), trim ( )

btrim(str[, trim]) Trim characters at the beginning and end of the string ‘str’ are removed.

  • trim (): Removes only whitespace from the beginning and end of a string.
    when you only need to clean up whitespace from strings.
  • btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
    when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
|      text   |
+-------------+
|   hello     |
|  !!spark!!  |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
|      text|  trimmed|
+----------+---------+
|   hello  |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
|      text |trimmed_custom|
+-----------+--------------+
|   hello   |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()

concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.

  • concat ( ) : No delimiter is added between the concatenated values.
    when you need strict concatenation without any delimiters.
  • concat_ws (): with a specified delimiter between the values.
    when you need a delimiter or want to ignore NULL values.
from pyspark.sql.functions import concat, concat_ws, lit

df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|      null|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+

df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|     Alice|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )

endswith() Returns a boolean.

+--------------+
|          text|
+--------------+
|   hello world|
|PySpark is fun|
|       welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
|          text|end_with?|
+--------------+---------+
|   hello world|    false|
|PySpark is fun|     true|
|       welcome|    false|
+--------------+---------+

df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
|          text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )

contains () check whether a PySpark DataFrame column contains a specific string or not,

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
|     full_name|contain?|
+--------------+--------+
|      John Doe|   false|
|    Jane Smith|    true|
|Robert Johnson|   false|
+--------------+--------+

df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )

find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).

length ( )

length ( ) Provides the length of characters for string data or the number of bytes for binary data.

from pyspark.sql.functions import length

df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
|      text|char_length|
+----------+-----------+
|     hello|          5|
|   PySpark|          7|
|Databricks|         10|
+----------+-----------+
like ( )

like ( ) use && and || operators to have multiple conditions in Scala.

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
|          name|   R%|  %th|%John%|
+--------------+-----+-----+------+
|      John Doe|false|false|  true|
|    Jane Smith|false| true| false|
|Robert Johnson| true|false|  true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
|          name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )

substring (str, pos[, len]): Returns the substring of str that starts at pos and is of length len

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+
from pyspark.sql.functions import substring

df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
|    Ja|       Ja|
|    An|       An|
|    To|       To|
|    To|       To|
+------+---------+
split ( )

PySpark – split(), Splitting a column into multiple columns

from pyspark.sql.functions import split

df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
|     full_name|      split_names|
+--------------+-----------------+
|      John,Doe|      [John, Doe]|
|    Jane,Smith|    [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+

split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+

df_expanded = df_with_split.select(
    "full_name",
    df_with_split["split_names"].getItem(0).alias("first_name"),
    df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+
translate ( )

 translate() string function can replace character by character of DataFrame column value.

from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color|       new_color|        replaced|
+-----+----------------+----------------+
|    F|almost colorless|A_N0st c0_0r_ess|
|    E|            null|            null|
|    D|       colorless|       c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )

PySpark – regexp_replace() replace a column value with a string for another string/substring

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

from pyspark.sql.functions import regexp_replace

df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)
+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
   .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False
+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+
overlay

PySpark – overlay()

+---------------+----+
|           col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay

df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
|      overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
  • upper: Converts all characters in the column to uppercase.
  • lower: Converts all characters in the column to lowercase.
  • initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
|             text|
+-----------------+
|      hello world|
|spark sql example|
|  UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap

df.withColumn("Uppercase", upper("text"))\
   .withColumn("lowercase", lower("text"))\
    .withColumn("Capitalized", initcap("text"))\
    .show()
+-----------------+-----------------+-----------------+-----------------+
|             text|        Uppercase|        lowercase|      Capitalized|
+-----------------+-----------------+-----------------+-----------------+
|      hello world|      HELLO WORLD|      hello world|      Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
|  UPPER and LOWER|  UPPER AND LOWER|  upper and lower|  Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+

Numeric Functions

Mathematical operations on numeric columns.

abs()

abs(): Absolute value.

from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )

round(): Round to a specific number of decimals.

from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )

pow(): Power function.

from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))

Aggregate Functions

sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
  • approx_count_distinct (): returns the count of distinct items in a group
  • countdistinct: returns the count of distinct items in a group
  • distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct

df.select(approx_count_distinct("salary").alias("approx_count_distinct"), 
          countDistinct("salary").alias("countDistinct"),
          ).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
|                    6|            6|
+---------------------+-------------+

df.select("salary").distinct().show()
+------+
|salary|
+------+
|  3000|
|  4600|
|  4100|
|  3300|
|  3900|
|  2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
  • avg ( ): average of values in the input column
  • sum ( )
  • sumDistinct ( ): returns the sum of all distinct values in a column.
  • max ( )
  • min ( )
  • mean ( )
from pyspark.sql.functions import avg,sum, max, min

df.select(avg("salary").alias("avg"),
    sum("salary").alias("sum"),
    max("salary").alias("max"),
    min("salary").alias("min")
    ).show()
+------+-----+----+----+
|   avg|  sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  2|   20|
|  3|   30|
|  4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
|      20.0|
+----------+
first (), last ()
  • first() returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element.
  • last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last

df.select(first("salary").alias("first"),
         last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )

PySpark – collect_list() returns all values from an input column with duplicates.

from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )

PySpark – collect_set() returns all values from an input column without duplicates.

from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+

PySpark Window functions

PySpark’s Window Ranking functions, like row_number()rank(), and dense_rank(), assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.

  • row_number() assigns unique sequential numbers,
  • rank() provides the ranking with gaps,
  • dense_rank() offers ranking without gaps.
row_number ()

row_number() window function gives the sequential row number starting from 1 to the result of each window partition.

from pyspark.sql.window import Window
from pyspark.sql.functions import col,  row_number

windowSpec  = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
    .where(col("row_number") < 4).show()

+---+---------+-----+----------+
|_c0|      cut|color|row_number|
+---+---------+-----+----------+
|677|     Fair|    D|         1|
|772|     Fair|    D|         2|
|940|     Fair|    D|         3|
| 43|     Good|    D|         1|
| 44|     Good|    D|         2|
|239|     Good|    D|         3|
| 63|    Ideal|    D|         1|
| 64|    Ideal|    D|         2|
|121|    Ideal|    D|         3|
| 55|  Premium|    D|         1|
| 62|  Premium|    D|         2|
|151|  Premium|    D|         3|
| 29|Very Good|    D|         1|
| 35|Very Good|    D|         2|
| 39|Very Good|    D|         3|
+---+---------+-----+----------+
rank ()

rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

from pyspark.sql.functions import rank
from pyspark.sql.window import Window

windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()

+-----+---------+-----+-----+----+
|  _c0|      cut|color|price|rank|
+-----+---------+-----+-----+----+
|   29|Very Good|    D|  357|   1|
|28262|Very Good|    D|  357|   1|
|28272|     Good|    D|  361|   3|
|28273|Very Good|    D|  362|   4|
|28288|Very Good|    D|  367|   5|
|31598|    Ideal|    D|  367|   5|
|31601|  Premium|    D|  367|   5|
|31602|  Premium|    D|  367|   5|
|31618|Very Good|    D|  373|   9|
|34922|Very Good|    D|  373|   9|
|38277|  Premium|    D|  386|  11|
|38278|  Premium|    D|  386|  11|
|38279|  Premium|    D|  386|  11|
|38280|  Premium|    D|  386|  11|
|41581|Very Good|    D|  388|  15|
|41582|Very Good|    D|  388|  15|
dense_rank ()

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
|  _c0|color|price|dense_rank|
+-----+-----+-----+----------+
|   29|    D|  357|         1|
|28262|    D|  357|         1|
|28272|    D|  361|         2|
|28273|    D|  362|         3|
|28288|    D|  367|         4|
|31598|    D|  367|         4|
|31601|    D|  367|         4|
|31602|    D|  367|         4|
|31618|    D|  373|         5|
|34922|    D|  373|         5|
|38277|    D|  386|         6|
|38278|    D|  386|         6|
|38279|    D|  386|         6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0  |color|price|percent_rank         |
+-----+-----+-----+---------------------+
|29   |D    |357  |0.0                  |
|28262|D    |357  |0.0                  |
|28272|D    |361  |2.952465308532625E-4 |
|28273|D    |362  |4.428697962798937E-4 |
|28288|D    |367  |5.90493061706525E-4  |
|31598|D    |367  |5.90493061706525E-4  |
|31601|D    |367  |5.90493061706525E-4  |
|31602|D    |367  |5.90493061706525E-4  |
|31618|D    |373  |0.00118098612341305  |
|34922|D    |373  |0.00118098612341305  |
|38277|D    |386  |0.0014762326542663124|
|38278|D    |386  |0.0014762326542663124|
lag (), lead ( )
  • lag ( ) function allows you to access a previous row’s value within the partition based on a specified offset.
  • lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
  .withColumn("lead",lead("salary",2).over(windowSpec))\
  .show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
|        Maria|   Finance|  3000|null|3900|
|        Scott|   Finance|  3300|null|null|
|          Jen|   Finance|  3900|3000|null|
|        Kumar| Marketing|  2000|null|null|
|         Jeff| Marketing|  3000|null|null|
|        James|     Sales|  3000|null|4100|
|        James|     Sales|  3000|null|4100|
|       Robert|     Sales|  4100|3000|4600|
|         Saif|     Sales|  4100|3000|null|
|      Michael|     Sales|  4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )

ntile ( ) returns the relative rank of result rows within a window partition

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

PySpark json related functions

sample data
+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )

The explode() function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.

from pyspark.sql.functions import explode
explode(col)

col: The name of the column or an expression containing an array or map to be exploded.

Return a new row for each element in the array or each key-value pair in the map.

# Usage with Arrays:
# Sample data
data = [
    (1, ["a", "b", "c"]),
    (2, ["d", "e"]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
|  1|     a|
|  1|     b|
|  1|     c|
|  2|     d|
|  2|     e|
+---+------+

# Usage with Maps:
# Sample data
data = [
    (1, {"key1": "value1", "key2": "value2"}),
    (2, {"key3": "value3"}),
    (3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])

# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key|  value|
+---+----+-------+
|  1|key1| value1|
|  1|key2| value2|
|  2|key3| value3|
+---+----+-------+

work with json

Exploding a JSON Array

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "values": ["a", "b", "c"]}',),
    ('{"id": 2, "values": ["d", "e"]}',),
    ('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data                           |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]}     |
|{"id": 3, "values": []}             |
+------------------------------------+

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values   |
+---+---------+
|1  |[a, b, c]|
|2  |[d, e]   |
|3  |[]       |
+---+---------+

# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    c|
|  2|    d|
|  2|    e|
+---+-----+

Exploding a JSON Map

from pyspark.sql.types import MapType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
    ('{"id": 2, "properties": {"key3": "value3"}}',),
    ('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data                                                    |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}}                  |
|{"id": 3, "properties": {}}                                  |
+-------------------------------------------------------------+

# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties                      |
+---+--------------------------------+
|1  |{key1 -> value1, key2 -> value2}|
|2  |{key3 -> value3}                |
|3  |{}                              |
+---+--------------------------------+

# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|key1|value1|
|  1|key2|value2|
|  2|key3|value3|
+---+----+------+
  • Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the explode operation.
  • Complex JSON Structures: For deeply nested JSON structures, use nested from_json and explode calls as needed.
from_json ( )

from_json ( ): Converts JSON string into Struct type or Map type.

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object

get_json_object () Extracts JSON element from a JSON string based on json path specified.

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+
json_tuple ( )

json_tuple() Extract the Data from JSON and create them as a new columns.

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )

to_json ()  is used to convert DataFrame columns MapType or Struct type to JSON string

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )

schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.

pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) → Column

from pyspark.sql.functions import schema_of_json, col

# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'), 
        ("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]

df = spark.createDataFrame(data, columns)

# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]


struct<name:string,age:int,skills:array<string>>

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.

expr ()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

#Add Month value from another column
df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,

lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|  111| 50000|         1|
|  222| 60000|         1|
|  333| 40000|         1|
+-----+------+----------+
typedLit ()

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame

df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)

Difference between lit() and typedLit() is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()


Stack ( )

Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.

Syntax

stack(n: Int, exprs: String*): Column

Parameters

  • n: The number of rows to create per input row. Each set of n expressions in exprs corresponds to a new row.
  • exprs: A sequence of column-value pairs, typically specified as strings in the format "column_name, column_value".
+---+---+---+---+
| id|  A|  B|  C|
+---+---+---+---+
|  1|100|200|300|
|  2|400|500|600|
+---+---+---+---+

# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
    "id",
    "stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|       A|  100|
|  1|       B|  200|
|  1|       C|  300|
|  2|       A|  400|
|  2|       B|  500|
|  2|       C|  600|
+---+--------+-----+

The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns.  StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.

StructType & StructField

Simple STructType and StructField

# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark . createDataFrame(data=data, schema=schema)

Nested StructType object struct

# Defining schema using nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

When ()

When () is similar to SQL and programming languages.

from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
   when(dfc.color == "F", "almost colorless")
  . when(dfc.color == "D", "colorless")
  . when(dfc.color == "G", "indistinguishable colorless")
  . when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
  .otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color                  |
+-----+---------------------------+
|F    |almost colorless           |
|E    |very colorful              |
|D    |colorless                  |
|G    |indistinguishable colorless|
|J    |almost colorless           |
|I    |almost colorless           |
|H    |very colorful              |
|K    |very colorful              |
|L    |very colorful              |
+-----+---------------------------+

df3 = df.withColumn("new_gender", expr(
     "CASE WHEN gender = 'M' THEN 'Male' " + 
          "WHEN gender = 'F' THEN 'Female' 
           WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, 
     CASE WHEN gender = 'M' THEN 'Male' " + 
         "WHEN gender = 'F' THEN 'Female' 
          WHEN gender IS NULL THEN ''" +
         "ELSE gender END as new_gender from EMP").show()

attention: above 2 example segments plus one by one

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)