PySpark Built-in Functions

PySpark provides a comprehensive library of built-in functions for performing complex transformations, aggregations, and data manipulations on DataFrames. These functions are categorized into different types based on their use cases.

Visual Summary of Categories

CategoryFunctions
Basic Functionsalias, cast, lit, col, when, isnull, isnan
String Functionsconcat, substring, lower, upper, trim, length, regexp_extract, split, translate, initcap
Date and Time Functionscurrent_date, datediff, to_date, year, hour, unix_timestamp, date_format
Mathematical Functionsabs, round, floor, sqrt, pow, exp, log, sin, cos, rand
Aggregation Functionscount, sum, avg, min, max, stddev, collect_list
Array and Map Functionsarray, size, array_contains, explode, map_keys, map_values
Null Handling Functionsisnull, na.fill, na.drop, na.replace
Window Functionsrow_number, rank, ntile, lag, lead, cume_dist, percent_rank
Statistical Functionscorr, covar_samp, approx_count_distinct, percentile_approx
UDF and Advanced Functionsudf, udf for SQL, Ppandas_udf, broadcast, schema_of_json, to_json
sample DataFrames
Sample dataframe 
df:
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
dfc:
+---+-----+-----------------------+
|id |color|current_datetime       |
+---+-----+-----------------------+
|1  |F    |2024-12-03 00:46:02.165|
|2  |E    |2024-12-03 00:46:02.165|
|3  |D    |2024-12-03 00:46:02.165|
|4  |G    |2024-12-03 00:46:02.165|
|6  |J    |2024-12-03 00:46:02.165|
|5  |I    |2024-12-03 00:46:02.165|
|7  |H    |2024-12-03 00:46:02.165|
|8  |K    |2024-12-03 00:46:02.165|
|9  |L    |2024-12-03 00:46:02.165|
+---+-----+-----------------------+

PySpark datetime related functions

PySpark provides a rich set of functions in the pyspark.sql.functions module to manipulate and analyze datetime columns.

date time Formatting

Common Date Format Patterns:
yyyy: Year
MM: Month
dd: Day
HH: Hour
mm: Minute
ss: Second

from pyspark.sql.functions import date_format
dfc.withColumn("formatted_date", \
date_format("current_datetime", "yyyy-MM-dd"))\
          .show(2,truncate=False)
+---+-----+-----------------------+--------------+
|id |color|current_datetime       |formatted_date|
+---+-----+-----------------------+--------------+
|1  |F    |2024-12-03 00:46:02.165|2024-12-03    |
|2  |E    |2024-12-03 00:46:02.165|2024-12-03    |
+---+-----+-----------------------+--------------+
Converting Between Types
  • to_date(column, format): Converts a string to a date.
  • unix_timestamp(column, format): Converts a string to a Unix timestamp.
from pyspark.sql.functions import to_date, unix_timestamp

dfc.withColumn("date_only", to_date("current_date")) \
    .withColumn("unix_time", unix_timestamp("current_date"))\          .select("current_date","date_only","unix_time")\
.show(truncate=False)
+--------------+----------+----------+
|current_date()|date_only |unix_time |
+--------------+----------+----------+
|2024-12-06    |2024-12-06|1733443200|
|2024-12-06    |2024-12-06|1733443200|
  • to_timestamp(column, format): Converts a string to a timestamp, default format of MM-dd-yyyy HH:mm:ss.SSS.
  • from_unixtime(unix_time, format): Converts a Unix timestamp to a string.
from pyspark.sql.functions import to_timestamp, from_unixtime,lit

df1 = spark.createDataFrame([("2024-12-05",)], ["string"])
+----------+
|    string|
+----------+
|2024-12-05|
+----------+

df1.select("string",to_timestamp("string")).show()
+----------+--------------------+
|    string|to_timestamp(string)|
+----------+--------------------+
|2024-12-05| 2024-12-05 00:00:00|
+----------+--------------------+

df2=df1.withColumn ("unixTimeStamp", lit(1733343200))
+----------+-------------+
|    string|unixTimeStamp|
+----------+-------------+
|2024-12-05|   1733343200|
+----------+-------------+

df2.select("unixTimeStamp",from_unixtime("unixTimeStamp")).show()
+-------------+-------------------------------------------------+
|unixTimeStamp|from_unixtime(unixTimeStamp, yyyy-MM-dd HH:mm:ss)|
+-------------+-------------------------------------------------+
|   1733343200|                              2024-12-04 20:13:20|
+-------------+-------------------------------------------------+
Date time Arithmetic / calculations
  • date_add ()
  • date_sub ()
  • add_month ()

df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()
+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+
datediff ( )

PySpark SQL function datediff() is used to calculate the difference in days between two provided dates.

from pyspark.sql.functions import col, current_date, datediff

df2 = df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    )
+----------+------------+--------+
|      date|current_date|datediff|
+----------+------------+--------+
|2019-07-01|  2024-12-06|    1985|
|2019-06-24|  2024-12-06|    1992|
|2019-08-24|  2024-12-06|    1931|
+----------+------------+--------+
months_between ( )

PySpark SQL months_between() function to get the number of months between two dates

from pyspark.sql.functions import col, current_date, datediff, months_between, round

df3 = df.withColumn("today", current_date())\
    .withColumn("monthsDiff", months_between(current_date(), col("date"))) \
    .withColumn("monthsDiff_round", round(months_between(current_date(), col("date")), 2))
+---+----------+----------+-----------+----------------+
| id|      date|     today| monthsDiff|monthsDiff_round|
+---+----------+----------+-----------+----------------+
|  1|2019-07-01|2024-12-06|65.16129032|           65.16|
|  2|2019-06-24|2024-12-06|65.41935484|           65.42|
|  3|2019-08-24|2024-12-06|63.41935484|           63.42|
+---+----------+----------+-----------+----------------+
Differences Between Dates in Years

utilize the months_between() function to get the difference in months and then convert it into years.

from pyspark.sql.functions import col, current_date, datediff, months_between, round, lit

df4 = df.withColumn("today", current_date()) \
  .withColumn("yearsDiff", months_between(current_date(), col("date")) / lit(12)) \
  .withColumn("yearsDiff_round", round(months_between(current_date(), col("date")) / lit(12), 2))

+---+----------+----------+-----------------+---------------+
| id|      date|     today|        yearsDiff|yearsDiff_round|
+---+----------+----------+-----------------+---------------+
|  1|2019-07-01|2024-12-06|5.430107526666667|           5.43|
|  2|2019-06-24|2024-12-06|5.451612903333333|           5.45|
|  3|2019-08-24|2024-12-06|5.284946236666666|           5.28|
+---+----------+----------+-----------------+---------------+
timediff(column1, column2) Calculates the difference between two

Calculates the difference between two

trunc ( )

trunc(column, format) truncate month/year, set to first of day in month/year.
e.g. 2024-10-08, truncate month –> 2024-10-01; truncate year –> 2024-01-01

df.select(col("input"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()
+----------+----------+-----------+
|     input|Month_Year|Month_Trunc|
+----------+----------+-----------+
|2024-12-01|2024-01-01| 2024-12-01|
|2023-10-11|2023-01-01| 2023-10-01|
|2022-09-17|2022-01-01| 2022-09-01|
+----------+----------+-----------+
interval

interval Used for advanced time calculations (not directly available but works with PySpark SQL).


Extracting Components from a Datetime
  • year(column): Extracts the year.
  • quarter(column): Returns the quarter as an integer from a given date or timestamp.
  • dayofyear(column): Extracts the day of the year from a given date or timestamp.
  • dayofmonth(column): Extracts the day of the month.
  • dayofweek(column): Returns the day of the week (1 = Sunday, 7 = Saturday).
  • weekofyear(column): Returns the week number of the year.
  • last_day(column): Return the last day of the month for a given date or timestamp column.The result is a date column where each date corresponds to the last day of the month for the original dates in the specified column.
  • next_day (column, day_of_week) e.g. Mon, Sunday
  • hour(column): Extracts the hour.
  • minute(column): Extracts the minute.
  • second(column): Extracts the second.
from pyspark.sql.functions import year, quarter,month, dayofmonth, weekofyear, hour, minute,second

df.withColumn ("year", year("input"))\
.withColumn ("quarter", quarter("input"))\
.withColumn ("month", month("input"))\
.withColumn ("hour", hour("input"))\
.withColumn ("minute", minute("input"))\
.withColumn ("second", second("input"))\
.drop("id","color").show(3,truncate=False)
+-----------------------+----+-------+-----+----+------+------+
|input                  |year|quarter|month|hour|minute|second|
+-----------------------+----+-------+-----+----+------+------+
|2024-01-01 02:46:02.75 |2024|1      |1    |2   |46    |2     |
|2023-01-11 15:35:32.265|2023|1      |1    |15  |35    |32    |
|2022-09-17 22:16:02.186|2022|3      |9    |22  |16    |2     |
+-----------------------+----+-------+-----+----+------+------+

from pyspark.sql.functions import year, month,dayofyear, dayofmonth,dayofweek, weekofyear,hour, minute,second
from pyspark.sql.functions import next_day, last_day,date_format

df.select(date_format("input","yyy-MM-dd").alias("input"), 
    dayofweek("input").alias('dayofweek'), 
     dayofmonth("input").alias('dayofmonth'),
     weekofyear("input").alias("weekofyear") ,
     next_day("input","mon").alias("nextday"),
     last_day("input").alias('lastday')
  ).show()
+----------+---------+----------+----------+----------+----------+
|     input|dayofweek|dayofmonth|weekofyear|   nextday|   lastday|
+----------+---------+----------+----------+----------+----------+
|2024-01-01|        2|         1|         1|2024-01-08|2024-01-31|
|2023-01-11|        4|        11|         2|2023-01-16|2023-01-31|
|2022-09-17|        7|        17|        37|2022-09-19|2022-09-30|
+----------+---------+----------+----------+----------+----------+
Filtering – current_date, current_timestamp
  • current_date (),
  • current_timestamp ()
from pyspark.sql.functions import current_date, current_timestamp

dfc.withColumn ("current_date", current_date())\
    .withColumn ("current_timestamp", current_timestamp())\
    .select("current_date","current_timestamp").show(truncate=False)
+------------+-----------------------+
|current_date|current_timestamp      |
+------------+-----------------------+
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
|2024-12-06  |2024-12-06 14:33:24.419|
+------------+-----------------------+

PySpark string related functions

btrim (str[, trim] ), trim ( )

btrim(str[, trim]) Trim characters at the beginning and end of the string ‘str’ are removed.

  • trim (): Removes only whitespace from the beginning and end of a string.
    when you only need to clean up whitespace from strings.
  • btrim(str[, trim]): Removes all specified leading and trailing characters from a string.
    when you need to remove specific characters (e.g., punctuation, symbols).
from pyspark.sql.functions import btrim, trim
+------------ +
|      text   |
+-------------+
|   hello     |
|  !!spark!!  |
| **PySpark** |
+-------------+
df.withColumn("trimmed", trim("text")).show()
+----------+---------+
|      text|  trimmed|
+----------+---------+
|   hello  |hello|
| !!spark!!|!!spark!!|
|**PySpark**|**PySpark**|
+----------+---------+
df.withColumn("trimmed_custom", btrim("text", " !*")).show()
+-----------+--------------+
|      text |trimmed_custom|
+-----------+--------------+
|   hello   |hello|
| !!spark!! |spark|
|**PySpark**|PySpark|
+-----------+--------------+
concat ( ) , concat_ws ()

concatenates multiple string columns or expressions into a single string column. with a specified delimiter between the values.

  • concat ( ) : No delimiter is added between the concatenated values.
    when you need strict concatenation without any delimiters.
  • concat_ws (): with a specified delimiter between the values.
    when you need a delimiter or want to ignore NULL values.
from pyspark.sql.functions import concat, concat_ws, lit

df.withColumn("full_name", concat(df.first_name, lit(" "), df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|      null|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+

df.withColumn("full_name", concat_ws(" ", df.first_name, df.last_name)).show()
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
|      John|      Doe|  John Doe|
|     Alice|     null|     Alice|
|       Bob|    Smith| Bob Smith|
+----------+---------+----------+
concat_ws() can process "null"; concat() cannot.
endswith ( )

endswith() Returns a boolean.

+--------------+
|          text|
+--------------+
|   hello world|
|PySpark is fun|
|       welcome|
+--------------+
df.select("text",(col("text").endswith("fun")).alias("end_with?")).show ()
+--------------+---------+
|          text|end_with?|
+--------------+---------+
|   hello world|    false|
|PySpark is fun|     true|
|       welcome|    false|
+--------------+---------+

df_filtered = df.filter(df["text"].endswith("fun"))
+--------------+
|          text|
+--------------+
|PySpark is fun|
+--------------+
contains ( )

contains () check whether a PySpark DataFrame column contains a specific string or not,

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("full_name",(col("full_name").contains("Smith")).alias("contain?")).show ()
+--------------+--------+
|     full_name|contain?|
+--------------+--------+
|      John Doe|   false|
|    Jane Smith|    true|
|Robert Johnson|   false|
+--------------+--------+

df.filter(col("full_name").contains(substring_to_check)).show ()
+----------+
| full_name|
+----------+
|Jane Smith|
+----------+
find_in_set ( )

find_in_set(str, str_array), Provides the 1-based index of the specified string (str) in the comma-delimited list (strArray).

length ( )

length ( ) Provides the length of characters for string data or the number of bytes for binary data.

from pyspark.sql.functions import length

df_with_length = df.withColumn("char_length", length("text"))
df_with_length.show()
+----------+-----------+
|      text|char_length|
+----------+-----------+
|     hello|          5|
|   PySpark|          7|
|Databricks|         10|
+----------+-----------+
like ( )

like ( ) use && and || operators to have multiple conditions in Scala.

+--------------+
|     full_name|
+--------------+
|      John Doe|
|    Jane Smith|
|Robert Johnson|
+--------------+
df.select("name", (col("name").like("R%")).alias("R%")
,(col("name").like("%th")).alias("%th")
,(col("name").like("%John%")).alias("%John%")
).show()
+--------------+-----+-----+------+
|          name|   R%|  %th|%John%|
+--------------+-----+-----+------+
|      John Doe|false|false|  true|
|    Jane Smith|false| true| false|
|Robert Johnson| true|false|  true|
+--------------+-----+-----+------+
df.filter(col("name").like("R%")).show()
+--------------+
|          name|
+--------------+
|Robert Johnson|
+--------------+
startswith ( )
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

df.filter(df.firstname.startswith("M")).show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+
substring (), substr ( )

substring (str, pos[, len]): Returns the substring of str that starts at pos and is of length len

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
+----------+-----+---+------+
from pyspark.sql.functions import substring

df1.select(df1.fname.substr(1,2).alias("substr"), substring(df1.fname, 1,2).alias("substring")).show()
+------+---------+
|substr|substring|
+------+---------+
|    Ja|       Ja|
|    An|       An|
|    To|       To|
|    To|       To|
+------+---------+
split ( )

PySpark – split(), Splitting a column into multiple columns

from pyspark.sql.functions import split

df_with_split = df.select("full_name", split(df["full_name"], ",").alias("split_names")).show()
+--------------+-----------------+
|     full_name|      split_names|
+--------------+-----------------+
|      John,Doe|      [John, Doe]|
|    Jane,Smith|    [Jane, Smith]|
|Robert,Johnson|[Robert, Johnson]|
+--------------+-----------------+

split_columns = split(df["full_name"], ",")
df_with_split = df.withColumn("first_name", split_columns[0]).withColumn("last_name", split_columns[1])
df_with_split.show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+

df_expanded = df_with_split.select(
    "full_name",
    df_with_split["split_names"].getItem(0).alias("first_name"),
    df_with_split["split_names"].getItem(1).alias("last_name")
).show()
+--------------+----------+---------+
|     full_name|first_name|last_name|
+--------------+----------+---------+
|      John,Doe|      John|      Doe|
|    Jane,Smith|      Jane|    Smith|
|Robert,Johnson|    Robert|  Johnson|
+--------------+----------+---------+
translate ( )

 translate() string function can replace character by character of DataFrame column value.

from pyspark.sql.functions import translate
d.withColumn ("replaced", translate("new_color", "aoml","A0N_")).show(3)
+-----+----------------+----------------+
|color|       new_color|        replaced|
+-----+----------------+----------------+
|    F|almost colorless|A_N0st c0_0r_ess|
|    E|            null|            null|
|    D|       colorless|       c0_0r_ess|
+-----+----------------+----------------+
regexp_replace ( )

PySpark – regexp_replace() replace a column value with a string for another string/substring

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
+---+------------------+-----+

from pyspark.sql.functions import regexp_replace

df.withColumn('address', regexp_replace('address', 'Rd', 'Road')) \
  .show(truncate=False)
+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
+---+------------------+-----+
from pyspark.sql.functions import when
df.withColumn('address', 
    when(df.address.endswith('Rd'),regexp_replace(df.address,'Rd','Road')) \
   .when(df.address.endswith('St'),regexp_replace(df.address,'St','Street')) \
   .when(df.address.endswith('Ave'),regexp_replace(df.address,'Ave','Avenue')) \
   .otherwise(df.address)) \
   .show(truncate=False
+---+----------------------+-----+
|id |address               |state|
+---+----------------------+-----+
|1  |14851 Jeffrey Road    |DE   |
|2  |43421 Margarita Street|NY   |
|3  |13111 Siemon Avenue   |CA   |
+---+----------------------+-----+
overlay

PySpark – overlay()

+---------------+----+
|           col1|col2|
+---------------+----+
|ABCDE_123486789| FGH|
+---------------+----+
from pyspark.sql.functions import overlay

df.select(overlay("col1", "col2", 7).alias("overlayed")).show()
+---------------+
|      overlayed|
+---------------+
|ABCDE_FGH486789|
+---------------+
upper ( ), lower ( ), initcap ( )
  • upper: Converts all characters in the column to uppercase.
  • lower: Converts all characters in the column to lowercase.
  • initcap: Converts the first letter of each word to uppercase and the rest to lowercase
+-----------------+
|             text|
+-----------------+
|      hello world|
|spark sql example|
|  UPPER and LOWER|
+-----------------+
from pyspark.sql.functions import upper, lower, initcap

df.withColumn("Uppercase", upper("text"))\
   .withColumn("lowercase", lower("text"))\
    .withColumn("Capitalized", initcap("text"))\
    .show()
+-----------------+-----------------+-----------------+-----------------+
|             text|        Uppercase|        lowercase|      Capitalized|
+-----------------+-----------------+-----------------+-----------------+
|      hello world|      HELLO WORLD|      hello world|      Hello World|
|spark sql example|SPARK SQL EXAMPLE|spark sql example|Spark Sql Example|
|  UPPER and LOWER|  UPPER AND LOWER|  upper and lower|  Upper And Lower|
+-----------------+-----------------+-----------------+-----------------+

Numeric Functions

Mathematical operations on numeric columns.

abs()

abs(): Absolute value.

from pyspark.sql.functions import abs
df.select(abs(df["column"]))
round ( )

round(): Round to a specific number of decimals.

from pyspark.sql.functions import round
df.select(round(df["column"], 2))
pow ( )

pow(): Power function.

from pyspark.sql.functions import pow
df.select(pow(df["column"], 2))

Aggregate Functions

sample df
sample df
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
distinct, countdistinct, approx_count_distinct
  • approx_count_distinct (): returns the count of distinct items in a group
  • countdistinct: returns the count of distinct items in a group
  • distinct ( ): distinct rows
from pyspark.sql.functions import approx_count_distinct, countDistinct

df.select(approx_count_distinct("salary").alias("approx_count_distinct"), 
          countDistinct("salary").alias("countDistinct"),
          ).show()
+---------------------+-------------+
|approx_count_distinct|countDistinct|
+---------------------+-------------+
|                    6|            6|
+---------------------+-------------+

df.select("salary").distinct().show()
+------+
|salary|
+------+
|  3000|
|  4600|
|  4100|
|  3300|
|  3900|
|  2000|
+------+
avg, sum, sumDistinct, max (), min (), mean ( )
  • avg ( ): average of values in the input column
  • sum ( )
  • sumDistinct ( ): returns the sum of all distinct values in a column.
  • max ( )
  • min ( )
  • mean ( )
from pyspark.sql.functions import avg,sum, max, min

df.select(avg("salary").alias("avg"),
    sum("salary").alias("sum"),
    max("salary").alias("max"),
    min("salary").alias("min")
    ).show()
+------+-----+----+----+
|   avg|  sum| max| min|
+------+-----+----+----+
|3400.0|34000|4600|2000|
+------+-----+----+----+

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("salary")).show(truncate=False)
+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

from pyspark.sql.functions import mean
df.select(mean("value").alias("mean_value")).show()
sample df
+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  2|   20|
|  3|   30|
|  4| null|
+---+-----+
Specifically calculates the mean. Used with select() or groupBy().Returns mean for specified columns.
+----------+
|mean_value|
+----------+
|      20.0|
+----------+
first (), last ()
  • first() returns the first element in a column. When ignoreNulls is set to true, it returns the first non-null element.
  • last() returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
from pyspark.sql.functions import first, last

df.select(first("salary").alias("first"),
         last("salary").alias("last"))\
.show(truncate=False)
+-----+----+
|first|last|
+-----+----+
|3000 |4100|
+-----+----+
collect_list ( )

PySpark – collect_list() returns all values from an input column with duplicates.

from pyspark.sql.functions import collect_list
df.select(collect_list("salary")).show(truncate=False)
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+
collect_set ( )

PySpark – collect_set() returns all values from an input column without duplicates.

from pyspark.sql.functions import collect_set
df.select(collect_set("salary")).show(truncate=False)
+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+

PySpark Window functions

PySpark’s Window Ranking functions, like row_number()rank(), and dense_rank(), assign sequential numbers to DataFrame rows based on specified criteria within defined partitions. These functions enable sorting and ranking operations, identifying row positions in partitions based on specific orderings.

  • row_number() assigns unique sequential numbers,
  • rank() provides the ranking with gaps,
  • dense_rank() offers ranking without gaps.
row_number ()

row_number() window function gives the sequential row number starting from 1 to the result of each window partition.

from pyspark.sql.window import Window
from pyspark.sql.functions import col,  row_number

windowSpec  = Window.partitionBy("cut").orderBy("color")
df.select("_c0","cut","color").withColumn("row_number",row_number().over(windowSpec)) \
    .where(col("row_number") < 4).show()

+---+---------+-----+----------+
|_c0|      cut|color|row_number|
+---+---------+-----+----------+
|677|     Fair|    D|         1|
|772|     Fair|    D|         2|
|940|     Fair|    D|         3|
| 43|     Good|    D|         1|
| 44|     Good|    D|         2|
|239|     Good|    D|         3|
| 63|    Ideal|    D|         1|
| 64|    Ideal|    D|         2|
|121|    Ideal|    D|         3|
| 55|  Premium|    D|         1|
| 62|  Premium|    D|         2|
|151|  Premium|    D|         3|
| 29|Very Good|    D|         1|
| 35|Very Good|    D|         2|
| 39|Very Good|    D|         3|
+---+---------+-----+----------+
rank ()

rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

from pyspark.sql.functions import rank
from pyspark.sql.window import Window

windowSpec= Window.partitionBy("color").orderBy("price")
df.withColumn("rank",rank().over(windowSpec))\
.select("_c0","cut","color","price","rank").show()

+-----+---------+-----+-----+----+
|  _c0|      cut|color|price|rank|
+-----+---------+-----+-----+----+
|   29|Very Good|    D|  357|   1|
|28262|Very Good|    D|  357|   1|
|28272|     Good|    D|  361|   3|
|28273|Very Good|    D|  362|   4|
|28288|Very Good|    D|  367|   5|
|31598|    Ideal|    D|  367|   5|
|31601|  Premium|    D|  367|   5|
|31602|  Premium|    D|  367|   5|
|31618|Very Good|    D|  373|   9|
|34922|Very Good|    D|  373|   9|
|38277|  Premium|    D|  386|  11|
|38278|  Premium|    D|  386|  11|
|38279|  Premium|    D|  386|  11|
|38280|  Premium|    D|  386|  11|
|41581|Very Good|    D|  388|  15|
|41582|Very Good|    D|  388|  15|
dense_rank ()

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("_c0","color","price","dense_rank").show()
+-----+-----+-----+----------+
|  _c0|color|price|dense_rank|
+-----+-----+-----+----------+
|   29|    D|  357|         1|
|28262|    D|  357|         1|
|28272|    D|  361|         2|
|28273|    D|  362|         3|
|28288|    D|  367|         4|
|31598|    D|  367|         4|
|31601|    D|  367|         4|
|31602|    D|  367|         4|
|31618|    D|  373|         5|
|34922|    D|  373|         5|
|38277|    D|  386|         6|
|38278|    D|  386|         6|
|38279|    D|  386|         6|
percent_rank ()
from pyspark.sql.functions import percent_rank
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("color").orderBy("price")
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .select("_c0","color","price","percent_rank").show(truncate=False)
+-----+-----+-----+---------------------+
|_c0  |color|price|percent_rank         |
+-----+-----+-----+---------------------+
|29   |D    |357  |0.0                  |
|28262|D    |357  |0.0                  |
|28272|D    |361  |2.952465308532625E-4 |
|28273|D    |362  |4.428697962798937E-4 |
|28288|D    |367  |5.90493061706525E-4  |
|31598|D    |367  |5.90493061706525E-4  |
|31601|D    |367  |5.90493061706525E-4  |
|31602|D    |367  |5.90493061706525E-4  |
|31618|D    |373  |0.00118098612341305  |
|34922|D    |373  |0.00118098612341305  |
|38277|D    |386  |0.0014762326542663124|
|38278|D    |386  |0.0014762326542663124|
lag (), lead ( )
  • lag ( ) function allows you to access a previous row’s value within the partition based on a specified offset.
  • lead ( ) function retrieves the column value from the following row within the partition based on a specified offset.
from pyspark.sql.functions import lag,lead

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
  .withColumn("lead",lead("salary",2).over(windowSpec))\
  .show()
+-------------+----------+------+----+----+
|employee_name|department|salary| lag|lead|
+-------------+----------+------+----+----+
|        Maria|   Finance|  3000|null|3900|
|        Scott|   Finance|  3300|null|null|
|          Jen|   Finance|  3900|3000|null|
|        Kumar| Marketing|  2000|null|null|
|         Jeff| Marketing|  3000|null|null|
|        James|     Sales|  3000|null|4100|
|        James|     Sales|  3000|null|4100|
|       Robert|     Sales|  4100|3000|4600|
|         Saif|     Sales|  4100|3000|null|
|      Michael|     Sales|  4600|4100|null|
+-------------+----------+------+----+----+
ntile ( )

ntile ( ) returns the relative rank of result rows within a window partition

from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()
+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

PySpark json related functions

sample data
+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+
explode ( )

The explode() function in PySpark is used to transform an array or map column into multiple rows. Each element of the array or each key-value pair in the map becomes a separate row.

from pyspark.sql.functions import explode
explode(col)

col: The name of the column or an expression containing an array or map to be exploded.

Return a new row for each element in the array or each key-value pair in the map.

# Usage with Arrays:
# Sample data
data = [
    (1, ["a", "b", "c"]),
    (2, ["d", "e"]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "letters"])
+---+------+
| id|letter|
+---+------+
|  1|     a|
|  1|     b|
|  1|     c|
|  2|     d|
|  2|     e|
+---+------+

# Usage with Maps:
# Sample data
data = [
    (1, {"key1": "value1", "key2": "value2"}),
    (2, {"key3": "value3"}),
    (3, {})
]
df = spark.createDataFrame(data, ["id", "properties"])

# Explode the map column
exploded_df = df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+-------+
| id| key|  value|
+---+----+-------+
|  1|key1| value1|
|  1|key2| value2|
|  2|key3| value3|
+---+----+-------+

work with json

Exploding a JSON Array

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import ArrayType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "values": ["a", "b", "c"]}',),
    ('{"id": 2, "values": ["d", "e"]}',),
    ('{"id": 3, "values": []}',)
]
df = spark.createDataFrame(data, ["json_data"])
+------------------------------------+
|json_data                           |
+------------------------------------+
|{"id": 1, "values": ["a", "b", "c"]}|
|{"id": 2, "values": ["d", "e"]}     |
|{"id": 3, "values": []}             |
+------------------------------------+

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+---------+
|id |values   |
+---+---------+
|1  |[a, b, c]|
|2  |[d, e]   |
|3  |[]       |
+---+---------+

# Explode the array column
exploded_df = parsed_df.select("id", explode("values").alias("value"))
exploded_df.show()
+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  1|    b|
|  1|    c|
|  2|    d|
|  2|    e|
+---+-----+

Exploding a JSON Map

from pyspark.sql.types import MapType, StringType

# Sample JSON data
data = [
    ('{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}',),
    ('{"id": 2, "properties": {"key3": "value3"}}',),
    ('{"id": 3, "properties": {}}',)
]
df = spark.createDataFrame(data, ["json_data"])
+-------------------------------------------------------------+
|json_data                                                    |
+-------------------------------------------------------------+
|{"id": 1, "properties": {"key1": "value1", "key2": "value2"}}|
|{"id": 2, "properties": {"key3": "value3"}}                  |
|{"id": 3, "properties": {}}                                  |
+-------------------------------------------------------------+

# Define schema for JSON column
json_schema = "struct<id:int, properties:map<string, string>>"

# Parse JSON column
parsed_df = df.withColumn("parsed_data", from_json(col("json_data"), json_schema))
parsed_df = parsed_df.select("parsed_data.*")  # Expand the struct
parsed_df.show(truncate=False)
+---+--------------------------------+
|id |properties                      |
+---+--------------------------------+
|1  |{key1 -> value1, key2 -> value2}|
|2  |{key3 -> value3}                |
|3  |{}                              |
+---+--------------------------------+

# Explode the map column
exploded_df = parsed_df.select("id", explode("properties").alias("key", "value"))
exploded_df.show()
+---+----+------+
| id| key| value|
+---+----+------+
|  1|key1|value1|
|  1|key2|value2|
|  2|key3|value3|
+---+----+------+
  • Empty Arrays or Maps: Rows with empty arrays or maps in the JSON will not generate any rows after the explode operation.
  • Complex JSON Structures: For deeply nested JSON structures, use nested from_json and explode calls as needed.
from_json ( )

from_json ( ): Converts JSON string into Struct type or Map type.

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+
get_json_object

get_json_object () Extracts JSON element from a JSON string based on json path specified.

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)
+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+
json_tuple ( )

json_tuple() Extract the Data from JSON and create them as a new columns.

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)
+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+
to_json ( )

to_json ()  is used to convert DataFrame columns MapType or Struct type to JSON string

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)
+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+
schema_of_json ( )

schema_of_json ( ) function in PySpark is used to infer the schema of a JSON string column or JSON string literal. It is particularly useful when you want to work with complex JSON data and need to define its schema for operations like parsing or transformation. Return a string representation of the schema in the DataType JSON format.

pyspark.sql.functions.schema_of_json(json: Union[ColumnOrName, str], options: Optional[Dict[str, str]] = None) → Column

from pyspark.sql.functions import schema_of_json, col

# Sample DataFrame with JSON strings
data = [("1", '{"name": "Alice", "age": 30}'), 
        ("2", '{"name": "Bob", "age": 25}')]
columns = ["id", "json_data"]

df = spark.createDataFrame(data, columns)

# Infer schema from JSON column
schema = df.select(schema_of_json(col("json_data"))).first()[0]


struct<name:string,age:int,skills:array<string>>

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.

expr ()
#Using CASE WHEN similar to SQL.
df2=df.withColumn("gender", \
expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' ELSE 'unknown' END") \
)
+-------+-------+
|   name| gender|
+-------+-------+
|  James|   Male|
|Michael| Female|
|    Jen|unknown|
+-------+-------+

#Add Month value from another column
df.select(df.date,df.increment,
     expr("add_months(date,increment)")
  .alias("inc_date")).show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame,

lit ()
from pyspark.sql.functions import col,lit
df.select(col("EmpId"),col("Salary"),lit("1").alias("lit_value1"))
df.show(truncate=False)
+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|  111| 50000|         1|
|  222| 60000|         1|
|  333| 40000|         1|
+-----+------+----------+
typedLit ()

PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. typedLit() provides a way to be explicit about the data type of the constant value being added to a DataFrame

df4 = df4.withColumn("lit_value3", typedLit("flag", StringType()))
df4.show(truncate=False)

Difference between lit() and typedLit() is that the typedLit() function can handle collection types e.g.: Array, Dictionary(map), etc. Below is an example usage of typedLit()


Stack ( )

Stack ( ) function is used to transform columns into rows. It’s particularly useful when you have a wide DataFrame (with many columns) and want to “unpivot” or “melt” it into a longer format.

Syntax

stack(n: Int, exprs: String*): Column

Parameters

  • n: The number of rows to create per input row. Each set of n expressions in exprs corresponds to a new row.
  • exprs: A sequence of column-value pairs, typically specified as strings in the format "column_name, column_value".
+---+---+---+---+
| id|  A|  B|  C|
+---+---+---+---+
|  1|100|200|300|
|  2|400|500|600|
+---+---+---+---+

# Unpivot columns A, B, C into rows
unpivoted_df = df.selectExpr(
    "id",
    "stack(3, 'A', A, 'B', B, 'C', C) as (variable, value)"
)
unpivoted_df.show()
+---+--------+-----+
| id|variable|value|
+---+--------+-----+
|  1|       A|  100|
|  1|       B|  200|
|  1|       C|  300|
|  2|       A|  400|
|  2|       B|  500|
|  2|       C|  600|
+---+--------+-----+

The StructType and StructField classes in PySpark are used to specify the custom schema to the DataFrame and create complex columns like nested struct, array, and map columns.  StructType is a collection of StructField objects that define column name, column data type, boolean to specify if the field can be nullable or not, and metadata.

StructType & StructField

Simple STructType and StructField

# Simple STructType and StructField
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark . createDataFrame(data=data, schema=schema)

Nested StructType object struct

# Defining schema using nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

When ()

When () is similar to SQL and programming languages.

from pyspark.sql.functions import when
dfc.select("color").withColumn("new_color",
   when(dfc.color == "F", "almost colorless")
  . when(dfc.color == "D", "colorless")
  . when(dfc.color == "G", "indistinguishable colorless")
  . when((dfc.color == "I") | (dfc.color == "J"), "almost colorless")
  .otherwise ("very colorful")
).show(truncate=False)
+-----+---------------------------+
|color|new_color                  |
+-----+---------------------------+
|F    |almost colorless           |
|E    |very colorful              |
|D    |colorless                  |
|G    |indistinguishable colorless|
|J    |almost colorless           |
|I    |almost colorless           |
|H    |very colorful              |
|K    |very colorful              |
|L    |very colorful              |
+-----+---------------------------+

df3 = df.withColumn("new_gender", expr(
     "CASE WHEN gender = 'M' THEN 'Male' " + 
          "WHEN gender = 'F' THEN 'Female' 
           WHEN gender IS NULL THEN ''" +
          "ELSE gender END"))

df.createOrReplaceTempView("EMP")
spark.sql("select name, 
     CASE WHEN gender = 'M' THEN 'Male' " + 
         "WHEN gender = 'F' THEN 'Female' 
          WHEN gender IS NULL THEN ''" +
         "ELSE gender END as new_gender from EMP").show()

attention: above 2 example segments plus one by one

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

from_json(), to_json()

from_json()

from_json() is a function used to parse a JSON string into a structured DataFrame format (such as StructType, ArrayType, etc.). It is commonly used to deserialize JSON strings stored in a DataFrame column into complex types that PySpark can work with more easily.

Syntax

from_json(column, schema, options={})

Parameters

column: The column containing the JSON string. Can be a string that refers to the column name or a column object.

schema

Specifies the schema of the expected JSON structure.
Can be a StructType (or other types like ArrayType depending on the JSON structure).

options

  • allowUnquotedFieldNames: Allows field names without quotes. (default: false)
  • allowSingleQuotes: Allows parsing single-quoted JSON strings. (default: true)
  • allowNumericLeadingZeros: Allows leading zeros in numbers. (default: false)
  • allowBackslashEscapingAnyCharacter: Allows escaping any character with a backslash. (default: false)
  • mode: Controls how to handle malformed records
    PERMISSIVE: The default mode that sets null values for corrupted fields.
    DROPMALFORMED: Discards rows with malformed JSON strings.
    FAILFAST: Fails the query if any malformed records are found.
Sample DF
+----------------------------+
|json_string                 |
+----------------------------+
|{"name": "John", "age": 30} |
|{"name": "Alice", "age": 25}|
+----------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, col
# Define the schema for the nested JSON
schema = StructType([
    StructField("name", StructType([
        StructField("first", StringType(), True),
        StructField("last", StringType(), True)
    ]), True),
    StructField("age", IntegerType(), True)
])

# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))

# Display the parsed JSON
df_parsed.select("parsed_json.*").show(truncate=False)
+--------+---+
| name   |age|
+--------+---+
|{John, Doe}|30|
|{Alice, Smith}|25|
+--------+---+

to_json()

to_json() is a function that converts a structured column (such as one of type StructType, ArrayType, etc.) into a JSON string.

Syntax

to_json(column, options={})

Parameters

column: The column you want to convert into a JSON string.
The column should be of a complex data type, such as StructType, ArrayType, or MapType.
Can be a column name (as a string) or a Column object.

options

  • pretty: If set to true, it pretty-prints the JSON output.
  • dateFormat: Specifies the format for DateType and TimestampType columns (default: yyyy-MM-dd).
  • timestampFormat: Specifies the format for TimestampType columns (default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX).
  • ignoreNullFields: When set to true, null fields are omitted from the resulting JSON string (default: true).
  • compression: Controls the compression codec used to compress the JSON output, e.g., gzip, bzip2.
sample data
+--------------------------------------------------------+
|json_string                                             |
+--------------------------------------------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|
+--------------------------------------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, to_json, col# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))
df_parsed.show(truncate=False)

+--------------------------------------------------------+--------------------+
|json_string                                             |parsed_json         |
+--------------------------------------------------------+--------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |{{John, Doe}, 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|{{Alice, Smith}, 25}|
+--------------------------------------------------------+--------------------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Comparison: split (), concat (), array_zip (), and explode ()

Featuresplit()concat()array_zip()explode()
DescriptionSplits a string column into an array of substrings based on a delimiter.Concatenates multiple arrays or strings into a single array or string.Zips multiple arrays element-wise into a single array of structs.Flattens an array into multiple rows, with one row per element in the array.
Input TypeStringArrays/StringsArraysArray
Output TypeArray of StringsArray or StringArray of StructsMultiple Rows (with original columns)
Key Use CasesSplitting strings based on delimiters (e.g., splitting comma-separated values).Merging multiple arrays into one, or multiple strings into one.Aligning data from multiple arrays element-wise, treating each set of elements as a row (struct).Flattening arrays for row-by-row processing (e.g., after zipping or concatenating arrays).
Examplesplit(col("string_col"), ",")["a", "b", "c"]concat(col("array1"), col("array2"))["a", "b", "x", "y"]array_zip(col("array1"), col("array2"))[{'a', 1}, {'b', 2}]explode(col("array_col")) → Converts an array into separate rows.
Handling Different LengthsNot applicableIf input arrays have different lengths, the shorter ones are concatenated as-is.If input arrays have different lengths, the shorter ones are padded with null.Not applicable. Converts each element into separate rows, regardless of length.
Handling null valuesWill split even if the string contains null values (but may produce empty strings).If arrays contain null, concat() still works, returning the non-null elements.Inserts null values into the struct where input arrays have null for a corresponding index.Preserves null elements during the explosion but still creates separate rows.

Breakdown:

  • split() is used to break a single string into an array of substrings.
  • concat() merges arrays or strings, resulting in a single array or string.
  • array_zip() aligns elements from multiple arrays, creating an array of structs.
  • explode() takes an array and converts it into multiple rows, one for each array element.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

StructType(), StructField()

StructType (), StructField ()

StructType () in PySpark is part of the pyspark.sql.types module and it is used to define the structure of a DataFrame schema.
StructField () is a fundamental part of PySpark’s StructType, used to define individual fields (columns) within a schema. A StructField specifies the name, data type, and other attributes of a column in a DataFrame schema.

StructType Syntax

from pyspark.sql.types import StructType, StructField, StringType, IntegerType 
schema = StructType([ 
StructField("name", StringType(), True), 
StructField("age", IntegerType(), True) 
])

StructField Syntax


StructField(name, dataType, nullable=True, metadata=None)

Parameter

fields (optional): A list of StructField objects that define the schema. Each StructField object specifies the name, type, and whether the field can be null.

Key Components

  • name: The name of the column.
  • dataType: The data type of the column (e.g., StringType(), IntegerType(), DoubleType(), etc.).
  • nullable: Boolean flag indicating whether the field can contain null values (True for nullable).

Common Data Types Used in StructField

  • StringType(): Used for string data.
  • IntegerType(): For integers.
  • DoubleType(): For floating-point numbers.
  • LongType(): For long integers.
  • ArrayType(): For arrays (lists) of values.
  • MapType(): For key-value pairs (dictionaries).
  • TimestampType(): For timestamp fields.
  • BooleanType(): For boolean values (True/False).
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Define schema using StructType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create DataFrame using the schema
data = [("John", 30), ("Alice", 25)]
df = spark.createDataFrame(data, schema)
df.show()
+-----+---+
| name|age|
+-----+---+
| John| 30|
|Alice| 25|
+-----+---+

Nested Schema

StructType can define a nested schema. For example, a column in the DataFrame might itself contain multiple fields.

nested_schema = StructType([
    StructField("name", StringType(), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True)
    ]), True)
])

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

contains(), collect(), transform(), udf(), udf for sql

contains ()

The contains() function in PySpark is used to check if a string column contains a specific substring. It’s typically used in a filter() or select() operation to search within the contents of a column and return rows where the condition is true.

Syntax

Column.contains(substring: str)

Key Points

  • contains() is case-sensitive by default. For case-insensitive matching, use it with lower() or upper().
  • It works on string columns only. For non-string columns, you would need to cast the column to a string first.
sample DF
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
| Robert|        IT|
|  Maria|        IT|
+-------+----------+

# Filter rows where 'Department' contains 'Sales'
filtered_df = df.filter(df["Department"].contains("Sales"))
filtered_df.show()
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
+-------+----------+

# Add a new column 'Is_Sales' based on whether 'Department' contains 'Sales'
from pyspark.sql.functions import when
df_with_flag = df.withColumn(
    "Is_Sales",
    when(df["Department"].contains("Sales"), "Yes").otherwise("No")
)

df_with_flag.show()
+-------+----------+--------+
|   Name|Department|Is_Sales|
+-------+----------+--------+
|  James|     Sales|     Yes|
|Michael|     Sales|     Yes|
| Robert|        IT|      No|
|  Maria|        IT|      No|
+-------+----------+--------+

#Case-Insensitive Search
# Search for 'sales' in a case-insensitive manner
from pyspark.sql.functions import lower
df_lower_filtered = df.filter(lower(df["Department"]).contains("sales"))
df_lower_filtered.show()
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
+-------+----------+

collect ()

The collect () function simply gathers all rows from the DataFrame or RDD and returns them as a list of Row objects. It does not accept any parameters.

Syntax

DataFrame.collect()
not any parameter at all

Key Points

  • Use on Small Data: Since collect() brings all the data to the driver, it should be used only on small datasets. If you try to collect a very large dataset, it can cause memory issues or crash the driver..
  • For large datasets, consider using alternatives like take(n), toPandas(), show(n)
sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

# Collect all rows from the DataFrame
collected_data = df.collect()

# Access all rows
print(collected_data)
[Row(Name='Alice', Age=25), Row(Name='Bob', Age=30), Row(Name='Charlie', Age=35)]

# Access a row
print(collected_data[0])
Row(Name='Alice', Age=25)

# Access a cell
print(collected_data[0][0])
Alice

# Display the collected data
for row in collected_data:
    print(row)
==output==
Row(Name='Alice', Age=25)
Row(Name='Bob', Age=30)
Row(Name='Charlie', Age=35)

# Filter and collect
filtered_data = df.filter(df["Age"] > 30).collect()

# Process collected data
for row in filtered_data:
    print(f"{row['Name']} is older than 30.")
==output==
Charlie is older than 30.


# Access specific columns from collected data
for row in collected_data:
    print(f"Name: {row['Name']}, Age: {row.Age}")
==output==
Name: Alice, Age: 25
Name: Bob, Age: 30
Name: Charlie, Age: 35



transform ()

The transform () function in PySpark is a higher-order function introduced to apply custom transformations to columns in a DataFrame. It allows you to perform a transformation function on an existing column, returning the transformed data as a new column. It is particularly useful when working with complex data types like arrays or for applying custom logic to column values.

Syntax

df1 = df.transform(my_function)

Parameters

my_function: a python function

Key point

The transform() method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function namemy_function"

sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

from pyspark.sql.functions import upper
#Declare my function
def addTheAge(mydf):    
    return mydf.withColumn('age',mydf.age + 2)

def upcaseTheName(mydf):
    reture mydf.withColumn("Name", upper(mydf.Name))

#transforming: age add 2 years
df1=df.transform(addTheAge)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 27|
|    Bob| 32|
|Charlie| 37|
+-------+---+

The transform () method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function name "my_function".


#transforming to upcase
df1=df.transform(upcaseTheName)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  ALICE| 25|
|    BOB| 30|
|CHARLIE| 35|
+-------+---+

#transforming to upcase and age add 2 years
df1=df.transform(addTheAge).transform(upcaseTheName)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  ALICE| 27|
|    BOB| 32|
|CHARLIE| 37|
+-------+---+


udf ()

The udf (User Defined Function) allows you to create custom transformations for DataFrame columns using Python functions. You can use UDFs when PySpark’s built-in functions don’t cover your use case or when you need more complex logic.

Syntax

from pyspark.sql.functions import udf
from pyspark.sql.types import DataType
# Define a UDF function ,
# Register the function as a UDF
my_udf = udf(py_function, returnType)

Parameters

  • py_function: A Python function you define to transform the data.
  • returnType: PySpark data type (e.g., StringType(), IntegerType(), DoubleType(), etc.) that indicates what type of data your UDF returns.
    • StringType(): For returning strings.
    • IntegerType(): For integers.
    • FloatType(): For floating-point numbers.
    • BooleanType(): For booleans.
    • ArrayType(DataType): For arrays.
    • StructType(): For structured data.

Key point

You need to specify the return type for the UDF, as PySpark needs to understand how to handle the results of the UDF.

sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+


from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function
def to_upper(name):
    return name.upper()

# Register the function as a UDF, specifying the return type as StringType
uppercase_udf = udf(to_upper, StringType())

# Apply the UDF using withColumn
df_upper = df.withColumn("Upper_Name", uppercase_udf(df["Name"]))

df_upper.show()
+-------+---+----------+
|   Name|Age|Upper_Name|
+-------+---+----------+
|  Alice| 25|     ALICE|
|    Bob| 30|       BOB|
|Charlie| 35|   CHARLIE|
+-------+---+----------+

# UDF Returning Boolean
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# Define a Python function
def is_adult(age):
    return age > 30

# Register the UDF
is_adult_udf = udf(is_adult, BooleanType())

# Apply the UDF
df_adult = df.withColumn("Is_Adult", is_adult_udf(df["Age"]))

# Show the result
df_adult.show()
+-------+---+--------+
|   Name|Age|Is_Adult|
+-------+---+--------+
|  Alice| 25|   false|
|    Bob| 30|   false|
|Charlie| 35|    true|
+-------+---+--------+

# UDF with Multiple Columns (Multiple Arguments)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function that concatenates two columns
def concat_name_age(name, age):
    return f"{name} is {age} years old"

# Register the UDF
concat_udf = udf(concat_name_age, StringType())

# Apply the UDF to multiple columns
df_concat = df.withColumn("Description", concat_udf(df["Name"], df["Age"]))

# Show the result
df_concat.show()
+-------+---+--------------------+
|   Name|Age|         Description|
+-------+---+--------------------+
|  Alice| 25|Alice is 25 years...|
|    Bob| 30| Bob is 30 years old|
|Charlie| 35|Charlie is 35 yea...|
+-------+---+--------------------+

spark.udf.register(), Register for SQL query

in PySpark, you can use spark.udf.register() to register a User Defined Function (UDF) so that it can be used not only in DataFrame operations but also in SQL queries. This allows you to apply your custom Python functions directly within SQL statements executed against your Spark session.

Syntax

spark.udf.register(“registered_pyFun_for_sql”, original_pyFun, returnType)

parameter

  • registered_pyFun_for_sql: The name of the UDF, which will be used in SQL queries.
  • original_pyFun: The original Python function that contains the custom logic.
  • returnType: The return type of the UDF, which must be specified (e.g., StringType(), IntegerType()).
smaple df
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

# Define a Python function
def original_myPythonFun(name):
    return name.upper()

# Register the function as a UDF for SQL with the return type StringType
spark.udf.register("registered_pythonFun_for_sql"\
                    , original_myPythonFun\
                    , StringType()\
                )

# Use the UDF - "registered_pythonFun_for_sql" in a SQL query
result = spark.sql(\
    "SELECT Name, registered_pythonFun_for_sql(Name) AS Upper_Name \
        FROM people"\
    )
result.show()
+-------+----------+
|   Name|Upper_Name|
+-------+----------+
|  Alice|     ALICE|
|    Bob|       BOB|
|Charlie|   CHARLIE|
+-------+----------+

we can directly use SQL style with magic %sql

%sql
SELECT Name
      , registered_pythonFun_for_sql(Name) AS Upper_Name 
  FROM people
+-------+----------+
|   Name|Upper_Name|
+-------+----------+
|  Alice|     ALICE|
|    Bob|       BOB|
|Charlie|   CHARLIE|
+-------+----------+
A complex python function declares, udf register, and apply example

Sample DataFrame

#Sample DataFrame
data = [    (1, 2, 3, 9, "alpha", "beta", "gamma"),    (4, 5, 6, 8, "delta", "epsilon", "zeta")]

df = spark.createDataFrame(data, ["col1", "col2", "col3", "col9", "str1", "str2", "str3"])
df.show()
+----+----+----+----+-----+-------+-----+
|col1|col2|col3|col9| str1|   str2| str3|
+----+----+----+----+-----+-------+-----+
|   1|   2|   3|   9|alpha|   beta|gamma|
|   4|   5|   6|   8|delta|epsilon| zeta|
+----+----+----+----+-----+-------+-----+

Define the Python function to handle multiple columns and scalars

# Define the Python function to handle multiple columns and scalars

def pyFun(col1, col2, col3, col9, int1, int2, int3, str1, str2, str3):
    # Example business logic
    re_value1 = col1 * int1 + len(str1)
    re_value2 = col2 + col9 + len(str2)
    re_value3 = col3 * int3 + len(str3)
    value4 = re_value1 + re_value2 + re_value3

    # Return multiple values as a tuple
    return re_value1, re_value2, re_value3, value4

Define the return schema for the UDF

# Define the return schema for the UDF

return_schema = StructType([
    StructField("re_value1", IntegerType(), True),
    StructField("re_value2", IntegerType(), True),
    StructField("re_value3", IntegerType(), True),
    StructField("value4", IntegerType(), True)
])

Register the UDF

# register the UDF

# for SQL use this 
# spark.udf.register("pyFun_udf", pyFun, returnType=return_schema)

pyFun_udf = udf(pyFun, returnType=return_schema)

Apply the UDF to the DataFrame, using ‘lit()’ for constant values

# Apply the UDF to the DataFrame, using 'lit()' for constant values

df_with_udf = df.select(
    col("col1"),
    col("col2"),
    col("col3"),
    col("col9"),
    col("str1"),
    col("str2"),
    col("str3"),
    pyFun_udf(
        col("col1"),
        col("col2"),
        col("col3"),
        col("col9"),
        lit(10),  # Use lit() for the constant integer values
        lit(20),
        lit(30),
        col("str1"),
        col("str2"),
        col("str3")
    ).alias("result")
)

# Show the result
df_with_udf.show(truncate=False)

==output==
+----+----+----+----+-----+-------+-----+------------------+
|col1|col2|col3|col9|str1 |str2   |str3 |result            |
+----+----+----+----+-----+-------+-----+------------------+
|1   |2   |3   |9   |alpha|beta   |gamma|{15, 15, 95, 125} |
|4   |5   |6   |8   |delta|epsilon|zeta |{45, 20, 184, 249}|
+----+----+----+----+-----+-------+-----+------------------+

Access the “result”

from pyspark.sql.functions import col
df_result_re_value3 = df_with_udf.select ('col1', 'result',col('result').re_value3.alias("re_value3"))
df_result_re_value3.show()
+----+------------------+---------+
|col1|            result|re_value3|
+----+------------------+---------+
|   1| {15, 15, 95, 125}|       95|
|   4|{45, 20, 184, 249}|      184|
+----+------------------+---------+

udf register for sql using

“sample dataframe”, “declare python function”, “Define the return schema for the UDF” are the same. only register udf different.

# register the UDF

spark.udf.register("pyFun_udf", pyFun, returnType=return_schema)

# create a temporary view for SQL query
df.createOrReplaceTempView("my_table")

notebook uses magic %sql


%sql
SELECT col1, col2, col3, col9, str1, str2, str3,            pyFun_udf(col1, col2, col3, col9, 10, 20, 30, str1, str2, str3) AS result    
FROM my_table

==output==
col1	col2	col3	col9	str1	str2	str3	result
1	2	3	9	alpha	beta	gamma	{"re_value1":15,"re_value2":15,"re_value3":95,"value4":125}
4	5	6	8	delta	epsilon	zeta	{"re_value1":45,"re_value2":20,"re_value3":184,"value4":249}

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Join(), union(), unionAll(), unionByName(), fill(), fillna()

join()

The join() method is used to combine two DataFrames based on a common column or multiple columns. This method is extremely versatile, supporting various types of SQL-style joins such as inner, outer, left, and right joins.

Syntax

DataFrame.join(other, on=None, how=None)

Parameters

  • other: The other DataFrame to join with the current DataFrame.
  • on: A string or a list of column names on which to join. This can also be an expression (using col() or expr()).

how: The type of join to perform. It can be one of

  • 'inner': Inner join (default). Returns rows that have matching values in both DataFrames.
  • 'outer' or 'full': Full outer join. Returns all rows from both DataFrames, with null values for missing matches.
  • 'left' or 'left_outer': Left outer join. Returns all rows from the left DataFrame and matched rows from the right DataFrame. Unmatched rows from the right DataFrame result in null values.
  • 'right' or 'right_outer': Right outer join. Returns all rows from the right DataFrame and matched rows from the left DataFrame. Unmatched rows from the left DataFrame result in null values.
  • 'left_semi': Left semi join. Returns only the rows from the left DataFrame where the join condition is satisfied.
  • 'left_anti': Left anti join. Returns only the rows from the left DataFrame where no match is found in the right DataFrame.
  • 'cross': Cross join (Cartesian product). Returns the Cartesian product of both DataFrames, meaning every row from the left DataFrame is combined with every row from the right DataFrame.
sample datasets
df1
+-------+-------+
|   name|dept_id|
+-------+-------+
|  Alice|      1|
|    Bob|      2|
|Charlie|      3|
+-------+-------+
df2
+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|      1|         HR|
|      2|    Finance|
|      4|Engineering|
+-------+-----------+

# Union the two DataFrames
df_union = df1.union(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

# Inner join (default)
df_inner = df1.join(df2, on="dept_id")

==output==
+-------+-----+---------+
|dept_id| name|dept_name|
+-------+-----+---------+
|      1|Alice|       HR|
|      2|  Bob|  Finance|
+-------+-----+---------+


# Inner join with filter/where conditions
df_joined = df1.join(df2, ( df1['dept_id'] == df2['dept_id']) \
  & (df1['age'] > 33)  \
  & (df1['age'] < 46)  \
  & (df1['col'] == vaule) \
, "inner")

Others are the same as SQL. new, let’s focus on Semi and Anti Joins

  • Left Semi Join: Only returns rows from the left DataFrame that have matches in the right DataFrame.
  • Left Anti Join: Only returns rows from the left DataFrame that don’t have matches in the right DataFrame.
# Left semi join
df_left_semi = df1.join(df2, on="dept_id", how="left_semi")
df_left_semi.show()

==output==
+-------+-----+
|dept_id| name|
+-------+-----+
|      1|Alice|
|      2|  Bob|
+-------+-----+
Charlie's dep_Id=3. it does not appear in df2, so skipped it.


# Left anti join
df_left_anti = df1.join(df2, on="dept_id", how="left_anti")
df_left_anti.show()

==output==
+-------+-------+
|dept_id|   name|
+-------+-------+
|      3|Charlie|
+-------+-------+
Only Charlie, dep_Id=3, does not appear in df2, so return it only.

union ()

The union() method is used to combine two DataFrames with the same schema (i.e., same number and type of columns). This operation concatenates the rows of the two DataFrames, similar to a SQL UNION operation, but without removing duplicate rows (like UNION ALL in SQL).

Syntax

DataFrame.union(other)

Key Points

  • Schema Compatibility: Both DataFrames must have the same number of columns, and the data types of the corresponding columns must match.
  • Union Behavior: Unlike SQL’s UNION which removes duplicates, union() in PySpark keeps all rows, including duplicates. This is equivalent to SQL’s UNION ALL.
  • Order of Rows: The rows from the first DataFrame will appear first, followed by the rows from the second DataFrame.
  • Column Names and Data Types Must Match: The column names don’t need to be identical in both DataFrames, but their positions and data types must match. If the number of columns or the data types don’t align, an error will be raised.
  • Union with Different Column Names: Even though column names don’t need to be the same, the columns are merged by position, not by name. If you attempt to union() DataFrames with different column orders, the results could be misleading. Therefore, it’s important to make sure the schemas match.
sample datasets
df1
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+

df2:
+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
|  David| 40|
+-------+---+

# Union the two DataFrames
df_union = df1.union(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

unionByName ()

The unionByName() method in PySpark is similar to the union() method but with a key distinction: it merges two DataFrames by aligning columns based on column names, rather than their positions.

Syntax

DataFrame.unionByName(other, allowMissingColumns=False)

Parameters

  • other: The other DataFrame to be unioned with the current DataFrame.
  • allowMissingColumns: A boolean flag (False by default). If True, this allows the union of DataFrames even if one DataFrame has columns that are missing in the other. The missing columns in the other DataFrame will be filled with null values.

Key Points

  • Column Name Alignment: The method aligns columns by name, not by their position, which makes it flexible for combining DataFrames that have different column orders.
  • Handling Missing Columns: By default, if one DataFrame has columns that are missing in the other, PySpark will throw an error. However, setting allowMissingColumns=True allows unioning in such cases, and missing columns in one DataFrame will be filled with null values in the other.
  • Duplicate Rows: Just like union(), unionByName() does not remove duplicates, and the result includes all rows from both DataFrames.
sample datasets
df1
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+

df2:
+-------+----+
|age| name|
+---+--------+
| 35| Charlie|
| 40| David  |
+---+--------+

# Union the two DataFrames
df_union = df1.unionByName(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

Handling Missing Columns with allowMissingColumns=True

sample df
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+

+---+-------+-------+
|age|   name|  title|
+---+-------+-------+
| 35|Charlie|Manager|
+---+-------+-------+

# Union by name with missing columns allowed
# Alice does not has "title"  
df_union_missing_columns = df1.unionByName(df2, allowMissingColumns=True)

==output==
+-------+---+-------+
|   name|age|  title|
+-------+---+-------+
|  Alice| 25|   null|
|Charlie| 35|Manager|
+-------+---+-------+

Multiple Columns and Different Schemas

Sample Df
+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|Alice| 25|New York|
+-----+---+--------+

+---+----+
|age|name|
+---+----+
| 30| Bob|
+---+----+

# Union by name with missing columns allowed
df_union = df1.unionByName(df2, allowMissingColumns=True)

==output==
+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|Alice| 25|New York|
|  Bob| 30|    null|
+-----+---+--------+

unionAll ()

unionAll() was an older method used to combine two DataFrames without removing duplicates. However, starting from PySpark 2.0, unionAll() has been deprecated and replaced by union(). The behavior of unionAll() is now identical to that of union() in PySpark.

look at union () in detail.


fillna (), df.na.fill()

fillna() is a method used to replace null (or missing) values in a DataFrame with a specified value. Return new DataFrame with null values replaced by the specified value.

Syntax

DataFrame.fillna(value, subset=None)

df.na.fill(value, subset=None)

df.na.fill(value, subset=None) has the result of df.fillna().

Parameters

  • value: The value to replace null with. It can be a scalar value (applied across all columns) or a dictionary (to specify column-wise replacement values). The type of value should match the type of the column you are applying it to (e.g., integers for integer columns, strings for string columns).
  • subset: Specifies the list of column names where the null values will be replaced.
    If not provided, the replacement is applied to all columns.
sample df
+-------+----+----+
|   name| age|dept|
+-------+----+----+
|  Alice|  25|null|
|    Bob|null|  HR|
|Charlie|  30|null|
+-------+----+----+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)

"age" is long, "dept" is string

# Fill null values with default values for all columns
df_filled = df.fillna(0)

==output==
+-------+---+----+
|   name|age|dept|
+-------+---+----+
|  Alice| 25|null|
|    Bob|  0|  HR|
|Charlie| 30|null|
+-------+---+----+
Bob's age is filled with 0, since it is "long", and "Dept" column did not fill, still remains "null".

Handle different Columns with different data type

# Fill nulls in the 'age' column with 0 and in the 'dept' column with "Unknown"
df_filled_columns = df.fillna({"age": 0, "dept": "Unknown"})

df_filled_columns.show()

==output==
+-------+---+-------+
|   name|age|   dept|
+-------+---+-------+
|  Alice| 25|Unknown|
|    Bob|  0|     HR|
|Charlie| 30|Unknown|

Fill Nulls in Specific Subset of Columns

# Fill null values only in the 'age' column
df_filled_age = df.fillna(0, subset=["age"])

df_filled_age.show()
+-------+---+----+
|   name|age|dept|
+-------+---+----+
|  Alice| 25|null|
|    Bob|  0|  HR|
|Charlie| 30|null|
+-------+---+----+


select()

select() is used to project a subset of columns from a DataFrame or to create new columns based on expressions. It returns a new DataFrame containing only the selected columns or expressions.

Syntax

DataFrame.select(*cols)
df.select(“id”, “name”)
df.select(df.id, df.name)
df.select(df[“id”], df[“name”])
df.select(“name”, col(“age”) + 5)

sample dataframe
+-------+---+---------+
|   name|age|     dept|
+-------+---+---------+
|  Alice| 25|       HR|
|    Bob| 30|  Finance|
|Charlie| 35|Marketing|
+-------+---+---------+
# Select specific columns (name and age)
df_selected = df.select("name", "age")
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+


# Select and transform columns, a new column added.
from pyspark.sql.functions import col

df_transformed = df.select("name", df.age,col("age") + 5)
+-------+---+---------+
|   name|age|(age + 5)|
+-------+---+---------+
|  Alice| 25|       30|
|    Bob| 30|       35|
|Charlie| 35|       40|
+-------+---+---------+

# Using Expressions Inside select()
from pyspark.sql.functions import expr

# Select columns using expressions
df_expr = df.select("name", expr("age + 10").alias("age_plus_10"))
+-------+-----------+
|   name|age_plus_10|
+-------+-----------+
|  Alice|         35|
|    Bob|         40|
|Charlie|         45|
+-------+-----------+

# Select All Columns
df_all_columns = df.select("*")
+-------+---+---------+
|   name|age|     dept|
+-------+---+---------+
|  Alice| 25|       HR|
|    Bob| 30|  Finance|
|Charlie| 35|Marketing|
+-------+---+---------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

distinct(), dropDuplicates(), orderBy(), sort(), groupBy(), agg()

distinct()

distinct () is used to remove duplicate rows from a DataFrame or RDD, leaving only unique rows. It returns a new DataFrame that contains only unique rows from the original DataFrame.

Syntax:

DataFrame.distinct()

Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|  Alice| 25|
|Charlie| 35|
+-------+---+
# Apply distinct() method
distinct_df = df.distinct()

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+
# Selecting Distinct Values for Specific Columns

#sample DF
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 35|
|  1|Alice| 29|
|  3|Cathy| 29|
+---+-----+---+
distinct_columns = df.select("name", "age").distinct()
distinct_columns.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 35|
|  3|Cathy| 29|
+---+-----+---+

dropDuplicates ()

dropDuplicates () is used to remove duplicate rows from a DataFrame based on one or more specific columns.

Syntax:

DataFrame.dropDuplicates([col1, col2, …, coln])

Parameters

cols (optional): This is a list of column names based on which you want to drop duplicates. If no column names are provided, dropDuplicates() will behave similarly to distinct(), removing duplicates across all columns.

Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|  Alice| 25|
|Charlie| 35|
+-------+---+
# Drop duplicates across all columns (similar to distinct)
df_no_duplicates = df.dropDuplicates()

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

# Drop duplicates based on the "name" column
df_unique_names = df.dropDuplicates(["name"])

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

In the second case, only the first occurrence of each name is kept, while duplicates are removed, regardless of other columns.


orderBy(), sort ()

orderBy() or sort () method is used to sort the rows of a DataFrame based on one or more columns in ascending or descending order. It is equivalent to the SQL ORDER BY clause. The method returns a new DataFrame that is sorted based on the provided column(s).

In PySpark, both orderBy() and sort() methods are available, and they are essentially aliases of each other, with no functional difference. You can use either based on preference:

Syntax:

DataFrame.orderBy(*cols, **kwargs)
DataFrame.sort(*cols, **kwargs)

Parameters

  • cols: Column(s) or expressions to sort by.
    This can be Column names as strings.
  • PySpark Column objects with the sorting direction (asc/desc).
Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 20|
|  David| 35|
+-------+---+
# Sort by 'age' column in ascending order (default)
df_sorted = df.orderBy("age")
df_ordered = df.sort("age")

==output==
+-------+---+
|   name|age|
+-------+---+
|Charlie| 20|
|  Alice| 25|
|    Bob| 30|
|  David| 35|
+-------+---+

# Sorting by multiple columns
original dataset
+-------+------+---+
|   name|gender|age|
+-------+------+---+
|  Alice|Female| 25|
|    Bob|  Male| 30|
|  Alice|Female| 22|
|Charlie|  Male| 20|
+-------+------+---+

# Method 1: Using list of column names
df_sorted1 = df.orderBy(["name", "age"], ascending=[True, False])

# Method 2: Using asc() and desc()
from pyspark.sql.functions import desc,asc
df_sorted2 = df.orderBy(asc("name"), desc("age"))

# Method 3: mix
df_sorted3  = df.orderBy(df["name"], df.age, ascending=[True, False]).show()
==output==
+-------+------+---+
|   name|gender|age|
+-------+------+---+
|  Alice|Female| 25|
|  Alice|Female| 22|
|    Bob|  Male| 30|
|Charlie|  Male| 20|
+-------+------+---+

groupBy ()

groupBy() is a method used to group rows in a DataFrame based on one or more columns, similar to SQL’s GROUP BY clause.

Return Type:

It returns a GroupedData object, on which you can apply aggregate functions (agg(), count(), sum(), etc.) to perform computations on the grouped data.

Syntax:

DataFrame.groupBy(*cols)

Parameters

cols: One or more column names or expressions to group by.

Sample dataframe
+-------+----------+
|   name|department|
+-------+----------+
|  Alice|     Sales|
|    Bob|     Sales|
|Charlie|        HR|
|  David|        HR|
|    Eve|     Sales|
+-------+----------+
# Group by department and count the number of employees in each department
df_grouped = df.groupBy("department").count()

==output==
+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|        HR|    2|
+----------+-----+

# Group by multiple columns
original dataset
+-------+----------+------+
|   name|department|gender|
+-------+----------+------+
|  Alice|     Sales|Female|
|    Bob|     Sales|  Male|
|Charlie|        HR|  Male|
|  David|        HR|  Male|
|    Eve|     Sales|Female|
+-------+----------+------+

# Group by department and gender, then count the number of employees in each group
df_grouped_multi = df.groupBy("department", "gender").count()

+----------+------+-----+
|department|gender|count|
+----------+------+-----+
|     Sales|Female|    2|
|     Sales|  Male|    1|
|        HR|  Male|    2|
+----------+------+-----+

agg ()

agg() function in PySpark is used for performing aggregate operations on a DataFrame, such as computing sums, averages, counts, and other aggregations. it is often used in combination with groupBy() to perform group-wise aggregations.

Syntax:

from pyspark.sql.functions import sum, avg, count, max, min
DataFrame.agg(*exprs)

Parameters

  • sum(column): Sum of values in the column.
  • avg(column): Average of values in the column.
  • count(column): Number of rows or distinct values.
  • max(column): Maximum value in the column.
  • min(column): Minimum value in the column.
Sample dataframe
+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  Alice| 25|  5000|
|    Bob| 30|  6000|
|Charlie| 20|  4000|
+-------+---+------+
# Apply aggregate functions on the DataFrame
df_agg = df.agg(sum("salary").alias("total_salary"), avg("age").alias("avg_age"))

==output==
+------------+-------+
|total_salary|avg_age|
+------------+-------+
|       15000|   25.0|
+------------+-------+

Aggregating with groupBy()

sample data
+-------+----------+------+
|   name|department|salary|
+-------+----------+------+
|  Alice|     Sales|  5000|
|    Bob|     Sales|  6000|
|Charlie|        HR|  4000|
|  David|        HR|  4500|
+-------+----------+------+

# Group by department and aggregate the sum and average of salaries
df_grouped_agg = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary"),
    count("name").alias("num_employees")
)

+----------+------------+----------+-------------+
|department|total_salary|avg_salary|num_employees|
+----------+------------+----------+-------------+
|     Sales|       11000|    5500.0|            2|
|        HR|        8500|    4250.0|            2|
+----------+------------+----------+-------------+

Aggregating multiple columns with different functions

from pyspark.sql.functions import sum, count, avg, max

df_grouped_multi_agg = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    count("name").alias("num_employees"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
)

+----------+------------+-------------+----------+----------+
|department|total_salary|num_employees|avg_salary|max_salary|
+----------+------------+-------------+----------+----------+
|     Sales|       11000|            2|    5500.0|      6000|
|        HR|        8500|            2|    4250.0|      4500|
+----------+------------+-------------+----------+----------+

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

alias(), asc(), desc(), cast(), filter(), where(), like() functions

alias ()

alias () is used to assign a temporary name or “alias” to a DataFrame, column, or table, which can be used for reference in further operations

# for dataframe: 
df1 = df.alias("df1")
df1.show()
==output==
+---+---+
| id|age|
+---+---+
|  1| 25|
|  2| 12|
|  3| 40|
+---+---+

caution: df.alias(“newName”) will not generate new dataframe,

# for column: 
df.select(df.id.alias("new_ID")).show()
df.select(df["id"].alias("new_ID")).show()
df.select(col("id").alias("new_ID")).show()
==output==
+------+
|new_ID|
+------+
|     1|
|     2|
|     3|
+------+

asc(), desc ()

asc (): ascending order when sorting the rows of a DataFrame by one or more columns.

sample df
+---+---+
| id|age|
+---+---+
|  1| 25|
|  2| 12|
|  3| 40|
+---+---+
from pyspark.sql.functions import asc
df.orderBy(asc("age")).show()
==output==
+---+---+
| id|age|
+---+---+
|  2| 12|
|  1| 25|
|  3| 40|
+---+---+

desc (): descending order when sorting the rows of a DataFrame by one or more columns.

from pyspark.sql.functions import desc
df.orderBy(desc("age")).show()
==output==
+---+---+
| id|age|
+---+---+
|  3| 40|
|  1| 25|
|  2| 12|
+---+---+

cast ()

df[“column_name”].cast(“new_data_type”)

This can be a string representing the data type (e.g., "int", "double", "string", etc.) or a PySpark DataType object (like IntegerType(), StringType(), FloatType(), etc.).

Common Data Types:

  • IntegerType(), "int": For integer values.
  • DoubleType(), "double": For double (floating-point) values.
  • FloatType(), "float": For floating-point numbers.
  • StringType(), "string": For text or string values.
  • DateType(), "date": For date values.
  • TimestampType(), "timestamp": For timestamps.
  • BooleanType(), "boolean": For boolean values (true/false).
sample dataframe
+---+---+
| id|age|
+---+---+
|  1| 25|
|  2| 12|
|  3| 40|
+---+---+

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- age: long (nullable = true)
from pyspark.sql.functions import col

# Cast a string column to integer
df1 = df.withColumn("age_int", col("age").cast("int"))
df1.printSchema()

==output==
root
 |-- id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- age_int: integer (nullable = true)


# Cast 'id' from long to string and 'age' from long to double
df_casted = df.withColumn("id", col("id").cast("int")) \
              .withColumn("age", col("age").cast("double"))
df_casted.show()              
df_casted.printSchema()  

==output==
+---+----+
| id| age|
+---+----+
|  1|25.0|
|  2|12.0|
|  3|40.0|
+---+----+

root
 |-- id: string (nullable = true)
 |-- age: double (nullable = true)

filter (), where (),

filter () or where () function is used to filter rows from a DataFrame based on a condition or set of conditions. It works similarly to SQL’s WHERE clause,

df.filter(condition)
df.where(condition)

Condition (for ‘filter’)

  • & (AND)
  • | (OR)
  • ~ (NOT)
  • == (EQUAL)

all “filter” can change to “where”, vice versa.

sample dataframe
+------+---+-------+
|  Name|Age|Salary|
+------+---+-------+
| Alice| 30|  50000|
|   Bob| 25|  30000|
|Alicia| 40|  80000|
|   Ann| 32|  35000|
+------+---+-------+

# Filter rows where age is greater than 30 AND salary is greater than 50000
df.filter((df["age"] > 30) & (df["salary"] > 50000))
df.where((df["age"] > 30) & (df["salary"] > 50000))

+------+---+------+
|  Name|Age|Salary|
+------+---+------+
|Alicia| 40| 80000|
+------+---+------+

# Filter rows where age is less than 25 OR salary is less than 40000
df.filter((df["age"] < 25) | (df["salary"] < 40000))
df.where((df["age"] < 25) | (df["salary"] < 40000))

+----+---+------+
|Name|Age|Salary|
+----+---+------+
| Bob| 25| 30000|
| Ann| 32| 35000|
+----+---+------+

like ()


like() function is used to perform pattern matching on string columns, similar to the SQL LIKE operator

df.filter(df[“column_name”].like(“pattern”))

Pattern

  • %: Represents any sequence of characters.
  • _: Represents a single character.

pattern is case sensitive.

sample dataframe
+------+---+
|  Name|Age|
+------+---+
| Alice| 30|
|   Bob| 25|
|Alicia| 28|
|   Ann| 32|
+------+---+


# Filtering names that start with 'Al'
df.filter(df["Name"].like("Al%")).show()

+------+---+
|  Name|Age|
+------+---+
| Alice| 30|
|Alicia| 28|
+------+---+

# Filtering names that end with 'n'
df.filter(df["Name"].like("%n")).show()

+----+---+
|Name|Age|
+----+---+
| Ann| 32|
+----+---+

# Filtering names that contain 'li'
df.filter(df["Name"].like("%li%")).show()

+------+---+
|  Name|Age|
+------+---+
| Alice| 30|
|Alicia| 28|
+------+---+

# Filtering names where the second letter is 'l'
df.filter(df["Name"].like("A_l%")).show()

+----+---+
|Name|Age|
+----+---+
+----+---+
nothing found in this pattern 

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

withColumn, select

withColumn()

It’s a “transformation”.
withColumn() add or replace a column_name in a DataFrame. In other words, if “column_name” exists, replace/change the existing column, otherwise, add “column_name” as new column.

Syntax:

from pyspark.sql.functions import col, lit, concat, when, upper, coalesce
df.withColumn(“column_name”, expression)

Key Parameters

  • "column_name": The name of the new or existing column.
  • expression: Any transformation, calculation, or function applied to create or modify the column.
Basic Column Creation (with literal values)

from pyspark.sql.functions import lit
# Add a column with a constant value
df_new = df.withColumn(“New_Column”, lit(100))

===output===
ID Name Grade New_Column
1 Alice null 100
2 Bob B 100
3 Charlie C 100

# Add a new column, no value
df_new = df.withColumn(“New_Column”, lit(None))

===output===
ID Name Grade New_Column
1 Alice null null
2 Bob B null
3 Charlie C null

Arithmetic Operation

from pyspark.sql.functions import col
# Create a new column based on arithmetic operations
df_arithmetic = df.withColumn(“New_ID”, col(“ID”) * 2 + 5)
df_arithmetic.show()
===output===
ID Name Grade New_ID
1 Alice null 7
2 Bob B 9
3 Charlie C 11

Using SQL Function

you can use functions like concat(), substring(), when(), length(), etc.

from pyspark.sql.functions import concat, lit
# Concatenate two columns with a separator
df_concat = df.withColumn(“Full_Description”, concat(col(“Name”), lit(” has ID “), col(“ID”)))

Conditional Logic

Using when() and otherwise() is equivalent to SQL’s CASE WHEN expression.

from pyspark.sql.functions import when

# Add a new column with conditional logic
df_conditional = df.withColumn("Is_Adult", when(col("ID") > 18, "Yes").otherwise("No"))
df_conditional.show()
String Function

You can apply string functions like upper(), lower(), or substring()

from pyspark.sql.functions import upper
# Convert a column to uppercase
df_uppercase = df.withColumn(“Uppercase_Name”, upper(col(“Name”)))
df_uppercase.show()

Type Casting

You can cast a column to a different data type.

Cast the ID column to a string

# Cast the ID column to a string
df_cast = df.withColumn(“ID_as_string”, col(“ID”).cast(“string”))
df_cast.show()

Handling Null Values

create columns that handle null values using coalesce() or fill()

Coalesce:

This function returns the first non-null value among its arguments.

from pyspark.sql.functions import coalesce
# Return non-null value between two columns
df_coalesce = df.withColumn(“NonNullValue”, coalesce(col(“Name”), lit(“Unknown”)))
df_coalesce.show()

Fill Missing Values:

# Replace nulls in a column with a default value
df_fill = df.na.fill({“Name”: “Unknown”})
df_fill.show()

Creating Columns with Complex Expressions

create columns based on more complex expressions or multiple transformations at once.

# Create a column with multiple transformations
df_complex = df.withColumn( “Complex_Column”, concat(upper(col(“Name”)), lit(“_”), col(“ID”).cast(“string”)) )
df_complex.show(truncate=False)

example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, when, upper, coalesce

# Initialize Spark session
spark = SparkSession.builder.appName("withColumnExample").getOrCreate()

# Create a sample DataFrame
data = [(1, "Alice", None), (2, "Bob", "B"), (3, "Charlie", "C")]
df = spark.createDataFrame(data, ["ID", "Name", "Grade"])

# Perform various transformations using withColumn()
df_transformed = df.withColumn("ID_Multiplied", col("ID") * 10) \
                   .withColumn("Full_Description", concat(upper(col("Name")), lit(" - ID: "), col("ID"))) \
                   .withColumn("Pass_Status", when(col("Grade") == "C", "Pass").otherwise("Fail")) \
                   .withColumn("Non_Null_Grade", coalesce(col("Grade"), lit("N/A"))) \
                   .withColumn("ID_as_String", col("ID").cast("string"))

# Show the result
df_transformed.show(truncate=False)
==output==
ID	Name	Grade	ID_Multiplied	Full_Description	Pass_Status	Non_Null_Grade	ID_as_String
1	Alice	null	10	ALICE - ID: 1	Fail	N/A	1
2	Bob	B	20	BOB - ID: 2	Fail	B	2
3	Charlie	C	30	CHARLIE - ID: 3	Pass	C	3

select ()

select () is used to project (select) a set of columns or expressions from a DataFrame. This function allows you to choose and work with specific columns, create new columns, or apply transformations to the data.

Syntax

DataFrame.select(*cols)

Commonly Used PySpark Functions with select()

  • col(column_name): Refers to a column.
  • alias(new_name): Assigns a new name to a column.
  • lit(value): Adds a literal value.
  • round(column, decimals): Rounds off the values in a column.
  • concat(col1, col2, ...): Concatenates multiple columns.
  • when(condition, value): Adds conditional logic.
Renaming Columns:
df.select(df["column1"].alias("new_column1"), df["column2"]).show()

Using Expressions:
from pyspark.sql import functions as F
df.select(F.col("column1"), F.lit("constant_value"), (F.col("column2") + 10).alias("modified_column2")).show()

Performing Calculations
df.select((df["column1"] * 2).alias("double_column1"), F.round(df["column2"], 2).alias("rounded_column2")).show()

Handling Complex Data Types (Struct, Array, Map):
df.select("struct_column.field_name").show()

Selecting with Wildcards:

While PySpark doesn’t support SQL-like wildcards directly, you can achieve similar functionality using selectExpr (discussed below) or other methods like looping over df.columns.

df.select([c for c in df.columns if “some_pattern” in c]).show()

Using selectExpr ()

df.selectExpr(“column1”, “column2 + 10 as new_column2”).show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Pyspark: read and write a parquet file

Reading Parquet Files

Syntax

help(spark.read.parquet)


df = spark.read \
    .format("parquet") \
    .option("mergeSchema", "true") \  # Merges schemas of all files (useful when reading from multiple files with different schemas)
    .option("pathGlobFilter", "*.parquet") \  # Read only specific files based on file name patterns
    .option("recursiveFileLookup", "true") \  # Recursively read files from directories and subdirectories
.load("/path/to/parquet/file/or/directory")

Options

  • mergeSchema: When reading Parquet files with different schemas, merge them into a single schema.
    • true (default: false)
  • pathGlobFilter: Allows specifying a file pattern to filter which files to read (e.g., “*.parquet”).
  • recursiveFileLookup: Reads files recursively from subdirectories.
    • true (default: false)
  • modifiedBefore/modifiedAfter: Filter files by modification time. For example:
    .option(“modifiedBefore”, “2023-10-01T12:00:00”)
    .option(“modifiedAfter”, “2023-01-01T12:00:00”)
  • maxFilesPerTrigger: Limits the number of files processed in a single trigger, useful for streaming jobs.
  • schema: Provides the schema of the Parquet file (useful when reading files without inferring schema).

from pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([StructField("id", IntegerType(), True),  StructField("name", StringType(), True)]) 

df = spark.read.schema(schema).parquet("/path/to/parquet")

Path
  • Load All Files in a Directory
    df = spark.read.parquet(“/path/to/directory/”)
  • Load Multiple Files Using Comma-Separated Paths
    df = spark.read.parquet(“/path/to/file1.parquet”, “/path/to/file2.parquet”, “/path/to/file3.parquet”)
  • Using Wildcards (Glob Patterns)
    df = spark.read.parquet(“/path/to/directory/*.parquet”)
  • Using Recursive Lookup for Nested Directories
    df = spark.read.option(“recursiveFileLookup”, “true”).parquet(“/path/to/top/directory”)
  • Load Multiple Parquet Files Based on Conditions
    df = spark.read .option(“modifiedAfter”, “2023-01-01T00:00:00”) .parquet(“/path/to/directory/”)
  • Programmatically Load Multiple Files
    file_paths = [“/path/to/file1.parquet”, “/path/to/file2.parquet”, “/path/to/file3.parquet”]
    df = spark.read.parquet(*file_paths)
  • Load Files from External Storage (e.g., S3, ADLS, etc.)
    df = spark.read.parquet(“s3a://bucket-name/path/to/files/”)

Example


# Reading Parquet files with options
df = spark.read \
    .format("parquet") \
    .option("mergeSchema", "true") \
    .option("recursiveFileLookup", "true") \
    .load("/path/to/parquet/files")

Conclusion

To load multiple Parquet files at once, you can:

  • Load an entire directory.
  • Use wildcard patterns to match multiple files.
  • Recursively load from subdirectories.
  • Programmatically pass a list of file paths. These options help streamline your data ingestion process when dealing with multiple Parquet files in Databricks.

Write to parquet

Syntax


# Writing a Parquet file
df.write \
    .format("parquet") \
    .mode("overwrite") \  # Options: "overwrite", "append", "ignore", "error"
    .option("compression", "snappy") \  # Compression options: none, snappy, gzip, lzo, brotli, etc.
    .option("maxRecordsPerFile", 100000) \  # Max number of records per file
    .option("path", "/path/to/output/directory") \
    .partitionBy("year", "month") \  # Partition the output by specific columns
.save()

Options

compression: .option(“compression”, “snappy”)

Specifies the compression codec to use when writing files.
Options: none, snappy (default), gzip, lzo, brotli, lz4, zstd, etc.

maxRecordsPerFile: .option(“maxRecordsPerFile”, 100000)

Controls the number of records per file when writing.
Default: None (no limit).

saveAsTable: saveAsTable(“parquet_table”)

Saves the DataFrame as a table in the catalog.

Save: save()
path:

Defines the output directory or file path.

mode: mode(“overwrite”)

Specifies the behavior if the output path already exists.

  • overwrite: Overwrites existing data.
  • append: Appends to existing data.
  • ignore: Ignores the write operation if data already exists.
  • error or errorifexists: Throws an error if data already exists (default).
Partition: partitionBy(“year”, “month”)

Partitions the output by specified columns

bucketBy: .bucketBy(10, “id”)

Distributes the data into a fixed number of buckets

df.write \
    .bucketBy(10, "id") \
    .sortBy("name") \
.saveAsTable("parquet_table")

Example


# Writing Parquet files with options
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .option("maxRecordsPerFile", 50000) \
    .partitionBy("year", "month") \
    .save("/path/to/output/directory")

writing key considerations:

  • Use mergeSchema if the Parquet files have different schemas, but it may increase overhead.
  • Compression can significantly reduce file size, but it can add some processing time during read and write operations.
  • Partitioning by columns is useful for organizing large datasets and improving query performance.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)