Overview of Commonly Used Unity Catalog and Spark SQL Management Commands

Summary of frequently use Unity Catalog and Spark SQL management commands, organized in a table.

CategoryCommandDescriptionExample
Catalog ManagementSHOW CATALOGSLists all available catalogs.SHOW CATALOGS;
Schema ManagementSHOW SCHEMAS IN <catalog_name>Lists schemas (databases) within a catalog.SHOW SCHEMAS IN main;
DESCRIBE SCHEMA <catalog_name>.<schema_name>Provides metadata about a specific schema.DESCRIBE SCHEMA main.default;
Table ManagementSHOW TABLES IN <catalog_name>.<schema_name>Lists all tables in a schema.SHOW TABLES IN main.default;
DESCRIBE TABLE <catalog_name>.<schema_name>.<table_name>Displays metadata about a specific table.DESCRIBE TABLE main.default.sales_data;
SHOW PARTITIONS <catalog_name>.<schema_name>.<table_name>Lists partitions of a partitioned table.SHOW PARTITIONS main.default.sales_data;
SHOW COLUMNS IN <catalog_name>.<schema_name>.<table_name>Lists all columns of a table, including their data types.SHOW COLUMNS IN main.default.sales_data;
DROP TABLE <catalog_name>.<schema_name>.<table_name>Deletes a table from the catalog.DROP TABLE main.default.sales_data;
Database ManagementSHOW DATABASESLists all databases (schemas) in the environment.SHOW DATABASES;
DESCRIBE DATABASE <database_name>Provides metadata about a specific database.DESCRIBE DATABASE default;
Data QueryingSELECT * FROM <catalog_name>.<schema_name>.<table_name>Queries data from a table.SELECT * FROM main.default.sales_data WHERE region = 'West';
Table CreationCREATE TABLE <catalog_name>.<schema_name>.<table_name> (<columns>)Creates a managed table in Unity Catalog.CREATE TABLE main.default.sales_data (id INT, region STRING, amount DOUBLE);

arrayType, mapType column and functions

In PySpark, ArrayType and MapType are used to define complex data structures within a DataFrame schema.

ArrayType column, and functions,

ArrayType allows you to store and work with arrays, which can hold multiple values of the same data type.

sample dataframe:
id, numbers|
1, [1, 2, 3]
2, [4, 5, 6]
3, [7, 8, 9]

explode ()

“explode” a given array into individual new rows using the explode function, Offen use it to flatten JSON.

from pyspark.sql.functions import explode

# Explode the 'numbers' array into separate rows
exploded_df = df.withColumn("number", explode(df.numbers))
display(explode_df)
==output==
id	numbers	number
1	[1,2,3]	1
1	[1,2,3]	2
1	[1,2,3]	3
2	[4,5,6]	4
2	[4,5,6]	5
2	[4,5,6]	6
3	[7,8,9]	7
3	[7,8,9]	8
3	[7,8,9]	9
split ()

Split strings based on a specified delimiter, return a array type.

from pyspark.sql.functions import split
df.withColumn(“Name_Split”, split(df[“Name”], “,”))

sample dataframe
+————–+
| Name |
+————–+
| John,Doe |
| Jane,Smith |
| Alice,Cooper |
+————–+

from pyspark.sql.functions import split
# Split the 'Name' column by comma
df_split = df.withColumn("Name_Split", split(df["Name"], ","))

==output==
+-------------+----------------+
| Name        | Name_Split     |
+-------------+----------------+
| John,Doe    | [John, Doe]    |
| Jane,Smith  | [Jane, Smith]  |
| Alice,Cooper| [Alice, Cooper]|
+-------------+----------------+
array ()

Creates an array column.

from pyspark.sql.functions import array, col
data=[(1,2,3),(4,5,6)]
schema=['num1','num2','num3']
df1=spark.createDataFrame(data,schema)
df1.show()
# create a new column - numbers, array type. elements use num1,num2,num3   
df1.withColumn("numbers",array(col("num1"),col("num2"),col("num3"))).show()

==output==
+----+----+----+
|num1|num2|num3|
+----+----+----+
|   1|   2|   3|
|   4|   5|   6|
+----+----+----+

#new array column "numbers" created
+----+----+----+-----------+
|num1|num2|num3| numbers   |
+----+----+----+-----------+
|   1|   2|   3| [1, 2, 3] |
|   4|   5|   6| [4, 5, 6] |
+----+----+----+-----------+
array_contains ()

Checks if an array contains a specific element.

from pyspark.sql.functions import array_contains
array_contains(array, value)

sample dataframe
+—+———————–+
|id |fruits |
+—+———————–+
|1 |[apple, banana, cherry]|
|2 |[orange, apple, grape] |
|3 |[pear, peach, plum] |
+—+———————–+

from pyspark.sql.functions import array_contains

# Using array_contains to check if the array contains 'apple'
df.select("id", array_contains("fruits", "apple").alias("has_apple")).show()

==output==
+---+----------+
| id|has_apple |
+---+----------+
|  1|      true|
|  2|      true|
|  3|     false|
+---+----------+
getItem()

Access individual elements of an array by their index using the getItem() method

# Select the second element (index start from 0) of the 'numbers' array
df1 = df.withColumn("item_1_value",   df.numbers.getItem(1))
display(df1)
==output==
id	numbers	      item_1_value
1	[1,2,3]	       2
2	[4,5,6]	       5
3	[7,8,9]	       8
size ()

Returns the size of the array.

from pyspark.sql.functions import size

# Get the size of the 'numbers' array
df.select(size(df.numbers)).show()

==output==
+-------------+
|size(numbers)|
+-------------+
|            3|
|            3|
|            3|
+-------------+
sort_array()

Sorts the array elements.

sort_array(col: ‘ColumnOrName’, asc: bool = True)

If `asc` is True (default) then ascending and if False then descending. if asc=True, can be omitted.

from pyspark.sql.functions import sort_array
df.withColumn("numbers", sort_array("numbers")).show()
==output==
ascending 
+---+---------+
| id|  numbers|
+---+---------+
|  1|[1, 2, 3]|
|  2|[4, 5, 6]|
|  3|[7, 8, 9]|
+---+---------+
df.select(sort_array("numbers", asc=False).alias("sorted_desc")).show()
==output==
descending 
+-----------+
|sorted_desc|
+-----------+
|  [3, 2, 1]|
|  [6, 5, 4]|
|  [9, 8, 7]|
+-----------+
concat ()

concat() is used to concatenate arrays (or strings) into a single array (or string). When dealing with ArrayType, concat() is typically used to combine two or more arrays into one.

from pyspark.sql.functions import concat
concat(*cols)

sample DataFrames
+—+——+——+
|id |array1|array2|
+—+——+——+
|1 | [a, b] | [x, y]|
|2 | [c] | [z] |
|3 | [d, e] | null |
+—+——-+——+

from pyspark.sql.functions import concat

# Concatenating array columns
df_concat = df.withColumn("concatenated_array", concat(col("array1"), col("array2")))
df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |null              |
+---+------+------+------------------+

Handling null Values

If any of the input columns are null, the entire result can become null. This is why you’re seeing null instead of just the non-null array.

To handle this, you can use coalesce() to substitute null with an empty array before performing the concat(). coalesce() returns the first non-null argument. Here’s how you can modify your code:

from pyspark.sql.functions import concat, coalesce, lit

# Define an empty array for the same type
empty_array = array()

# Concatenate with null handling using coalesce
df_concat = df.withColumn(
    "concatenated_array",
    concat(coalesce(col("array1"), empty_array), coalesce(col("array2"), empty_array))
)

df_concat.show(truncate=False)

==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1  |[a, b]|[x, y]|[a, b, x, y]      |
|2  |[c]   |[z]   |[c, z]            |
|3  |[d, e]|null  |[d, e]            |
+---+------+------+------------------+
array_zip ()

Combines arrays into a single array of structs.


☰ MapType column, and functions

MapType is used to represent map key-value pair similar to python Dictionary (Dic)

from pyspark.sql.types import MapType, StringType, IntegerType
# Define a MapType
my_map = MapType(StringType(), IntegerType(), valueContainsNull=True)

Parameters:

  • keyType: Data type of the keys in the map. You can use PySpark data types like StringType(), IntegerType(), DoubleType(), etc.
  • valueType: Data type of the values in the map. It can be any valid PySpark data type
  • valueContainsNull: Boolean flag (optional). It indicates whether null values are allowed in the map. Default is True.

sample dataset
# Sample dataset (Product ID and prices in various currencies)
data = [
(1, {“USD”: 100, “EUR”: 85, “GBP”: 75}),
(2, {“USD”: 150, “EUR”: 130, “GBP”: 110}),
(3, {“USD”: 200, “EUR”: 170, “GBP”: 150}),
]


sample dataframe
+———-+————————————+
|product_id|prices |
+———-+————————————+
|1 |{EUR -> 85, GBP -> 75, USD -> 100} |
|2 |{EUR -> 130, GBP -> 110, USD -> 150}|
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+———-+————————————+

Accessing map_keys (), map_values ()

Extract keys (currency codes) and values (prices):

from pyspark.sql.functions import col, map_keys, map_values
# Extract map keys and values
df.select(
    col("product_id"),
    map_keys(col("prices")).alias("currencies"),
    map_values(col("prices")).alias("prices_in_currencies")
).show(truncate=False)

==output==
+----------+---------------+--------------------+
|product_id|currencies     |prices_in_currencies|
+----------+---------------+--------------------+
|1         |[EUR, GBP, USD]|[85, 75, 100]       |
|2         |[EUR, GBP, USD]|[130, 110, 150]     |
|3         |[EUR, GBP, USD]|[170, 150, 200]     |
+----------+---------------+--------------------+
exploder ()

Use explode () to flatten the map into multiple rows, where each key-value pair from the map becomes a separate row.

from pyspark.sql.functions import explode
# Use explode to flatten the map
df_exploded = df.select("product_id", explode("prices").alias("currency", "price")).show()

==output==
+----------+--------+-----+
|product_id|currency|price|
+----------+--------+-----+
|         1|     EUR|   85|
|         1|     GBP|   75|
|         1|     USD|  100|
|         2|     EUR|  130|
|         2|     GBP|  110|
|         2|     USD|  150|
|         3|     EUR|  170|
|         3|     GBP|  150|
|         3|     USD|  200|
+----------+--------+-----+
Accessing specific elements in the map

To get the price for a specific currency (e.g., USD) for each product:

from pyspark.sql.functions import col, map_keys, map_values
# Access the value for a specific key in the map 
df.select(
    col("product_id"),
    col("prices").getItem("USD").alias("price_in_usd")
).show(truncate=False)

==output==
+----------+------------+
|product_id|price_in_usd|
+----------+------------+
|1         |100         |
|2         |150         |
|3         |200         |
+----------+------------+
filtering

filter the rows based on conditions involving the map values

from pyspark.sql.functions import col, map_keys, map_values
# Filter rows where price in USD is greater than 150
df.filter(col("prices").getItem("USD") > 150).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
map_concat ()

Combines two or more map columns by merging their key-value pairs.

from pyspark.sql.functions import map_concat, create_map, lit

# Define the additional currency as a new map using create_map()
additional_currency = create_map(lit("CAD"), lit(120))

# Add a new currency (e.g., CAD) with a fixed price to all rows
df.withColumn(
    "updated_prices",
    map_concat(col("prices"), additional_currency)
).show(truncate=False)

==output==
+----------+------------------------------------+
|product_id|prices                              |
+----------+------------------------------------+
|3         |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+

withColumnRenamed(), drop(), show()

withColumnRenamed ()

withColumnRenamed(), Rename an existing column in a DataFrame

DataFrame.withColumnRenamed(existingName, newName)

existingName: The current name of the column you want to rename.
newName: The new name for the column.

drop ()

In PySpark, you can drop columns from a DataFrame using the drop() method. Here’s a breakdown of the syntax, options, parameters, and examples for dropping columns in PySpark.

Syntax

DataFrame.drop(*cols)

*cols: One or more column names (as strings) that you want to drop from the DataFrame. You can pass these as individual arguments or a list of column names.

# Drop single column 'age'
df_dropped = df.drop("age")

# Drop multiple columns 'id' and 'age'
df_dropped_multiple = df.drop("id", "age")

# Dropping Columns Using a List of Column Names
# Define columns to drop
columns_to_drop = ["id", "age"]

# Drop columns using list
df_dropped_list = df.drop(*columns_to_drop)
df_dropped_list.show()
Show()

By default, display 20 row, truncate 20 characters

df.show(n=10, truncate= True, vertical=True)

Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

If a Delta table is saved in Blob Storage or Azure Data Lake Storage (ADLS), you access it using the file path rather than a cataloged name (like in Unity Catalog). Here’s how to read from and write to Delta tables stored in Blob Storage or ADLS in Spark SQL and PySpark.

Reading Delta Tables from Blob Storage or ADLS

To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

Syntax

# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
  

Writing Delta Tables to Blob Storage or ADLS

When writing to Delta tables, use the delta format and specify the path where you want to store the table.

Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks

# PySpark 
df.write.format("delta").mode("overwrite").save("path/to/delta/table")

Options and Parameters for Delta Read/Write

Options for Reading Delta Tables:

You can configure the read operation with options like:

  • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
  • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
  • timeTravel: Enables querying older versions of the Delta table.
df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
df.show()

Options for Writing Delta Tables:

mode: Controls the write mode.

  • overwrite: Overwrites the existing data.
  • append: Adds to existing data.
  • ignore: Ignores the write if data exists.
  • errorifexists: Defaults to throwing an error if data exists.

partitionBy: Partition the data by one or more columns.

overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("column_name") \
    .save("path/to/delta/table")

Time Travel and Versioning with Delta (PySpark)

Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()

# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()

Conclusion:

  • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
  • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
  • For reading, use .format("delta").load("path").
  • For writing, use .write.format("delta").save("path").

Read table from Unity Catalog and write table to Unity Catalog

To read from and write to Unity Catalog in PySpark, you typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name.

Reading from Unity Catalog

To read a table from Unity Catalog, specify the table’s full path:

# Reading a table
df = spark.read.table("catalog.schema.table")
df.show()

# Using Spark SQL
df = spark.sql("SELECT * FROM catalog.schema.table")

Writing to Unity Catalog

To write data to Unity Catalog, you specify the table name in the saveAsTable method:

# Writing a DataFrame to a new table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog.schema.new_table")

Options for Writing to Unity Catalog:

  • format: Set to "delta" for Delta Lake tables, as Unity Catalog uses Delta format.
  • mode: Options include overwrite, append, ignore, and error.

Example: Read, Transform, and Write Back to Unity Catalog

# Read data from a Unity Catalog table
df = spark.read.table("catalog_name.schema_name.source_table")

# Perform transformations
transformed_df = df.filter(df["column_name"] > 10)

# Write transformed data back to a different table
transformed_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_name.schema_name.target_table")

Comparison of Delta, JSON, and CSV Reads/Writes

FormatStorage LocationRead SyntaxWrite SyntaxNotes
DeltaUnity Catalogdf = spark.read.table("catalog.schema.table")df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.table")Unity Catalog natively supports Delta with schema enforcement and versioning.
Blob/ADLSdf = spark.read.format("delta").load("path/to/delta/folder")df.write.format("delta").mode("overwrite").save("path/to/delta/folder")Requires Delta Lake library; supports ACID transactions and time-travel capabilities.
JSONUnity CatalogNot directly supported in Unity Catalog; typically needs to be read as a Delta table or temporary table.Not directly supported; must be converted to Delta format before writing to Unity Catalog.Convert JSON to Delta format to enable integration with Unity Catalog.
Blob/ADLSdf = spark.read.json("path/to/json/files")df.write.mode("overwrite").json("path/to/json/folder")Simple structure, no schema enforcement by default; ideal for semi-structured data.
CSVUnity CatalogNot directly supported; CSV files should be imported as Delta tables or temporary views.Not directly supported; convert to Delta format for compatibility with Unity Catalog.Similar to JSON, requires conversion for use in Unity Catalog.
Blob/ADLSdf = spark.read.option("header", True).csv("path/to/csv/files")df.write.option("header", True).mode("overwrite").csv("path/to/csv/folder")Lacks built-in schema enforcement; additional steps needed for ACID or schema evolution.

Detailed Comparison and Notes:

  1. Unity Catalog
    • Delta: Unity Catalog fully supports Delta format, allowing for schema evolution, ACID transactions, and built-in security and governance.
    • JSON and CSV: To use JSON or CSV in Unity Catalog, convert them into Delta tables or load them as temporary views before making them part of Unity’s governed catalog. This is because Unity Catalog enforces structured data formats with schema definitions.
  2. Blob Storage & ADLS (Azure Data Lake Storage)
    • Delta: Blob Storage and ADLS support Delta tables if the Delta Lake library is enabled. Delta on Blob or ADLS retains most Delta features but may lack some governance capabilities found in Unity Catalog.
    • JSON & CSV: Both Blob and ADLS provide support for JSON and CSV formats, allowing flexibility with semi-structured data. However, they do not inherently support schema enforcement, ACID compliance, or governance features without Delta Lake.
  3. Delta Table Benefits:
    • Schema Evolution and Enforcement: Delta enables schema evolution, essential in big data environments.
    • Time Travel: Delta provides versioning, allowing access to past versions of data.
    • ACID Transactions: Delta ensures consistency and reliability in large-scale data processing.

from_json(), to_json()

from_json()

from_json() is a function used to parse a JSON string into a structured DataFrame format (such as StructType, ArrayType, etc.). It is commonly used to deserialize JSON strings stored in a DataFrame column into complex types that PySpark can work with more easily.

Syntax

from_json(column, schema, options={})

Parameters

column: The column containing the JSON string. Can be a string that refers to the column name or a column object.

schema

Specifies the schema of the expected JSON structure.
Can be a StructType (or other types like ArrayType depending on the JSON structure).

options

  • allowUnquotedFieldNames: Allows field names without quotes. (default: false)
  • allowSingleQuotes: Allows parsing single-quoted JSON strings. (default: true)
  • allowNumericLeadingZeros: Allows leading zeros in numbers. (default: false)
  • allowBackslashEscapingAnyCharacter: Allows escaping any character with a backslash. (default: false)
  • mode: Controls how to handle malformed records
    PERMISSIVE: The default mode that sets null values for corrupted fields.
    DROPMALFORMED: Discards rows with malformed JSON strings.
    FAILFAST: Fails the query if any malformed records are found.
Sample DF
+----------------------------+
|json_string                 |
+----------------------------+
|{"name": "John", "age": 30} |
|{"name": "Alice", "age": 25}|
+----------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, col
# Define the schema for the nested JSON
schema = StructType([
    StructField("name", StructType([
        StructField("first", StringType(), True),
        StructField("last", StringType(), True)
    ]), True),
    StructField("age", IntegerType(), True)
])

# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))

# Display the parsed JSON
df_parsed.select("parsed_json.*").show(truncate=False)
+--------+---+
| name   |age|
+--------+---+
|{John, Doe}|30|
|{Alice, Smith}|25|
+--------+---+

to_json()

to_json() is a function that converts a structured column (such as one of type StructType, ArrayType, etc.) into a JSON string.

Syntax

to_json(column, options={})

Parameters

column: The column you want to convert into a JSON string.
The column should be of a complex data type, such as StructType, ArrayType, or MapType.
Can be a column name (as a string) or a Column object.

options

  • pretty: If set to true, it pretty-prints the JSON output.
  • dateFormat: Specifies the format for DateType and TimestampType columns (default: yyyy-MM-dd).
  • timestampFormat: Specifies the format for TimestampType columns (default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX).
  • ignoreNullFields: When set to true, null fields are omitted from the resulting JSON string (default: true).
  • compression: Controls the compression codec used to compress the JSON output, e.g., gzip, bzip2.
sample data
+--------------------------------------------------------+
|json_string                                             |
+--------------------------------------------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|
+--------------------------------------------------------+


from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, to_json, col# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))
df_parsed.show(truncate=False)

+--------------------------------------------------------+--------------------+
|json_string                                             |parsed_json         |
+--------------------------------------------------------+--------------------+
|{"name": {"first": "John", "last": "Doe"}, "age": 30}   |{{John, Doe}, 30}   |
|{"name": {"first": "Alice", "last": "Smith"}, "age": 25}|{{Alice, Smith}, 25}|
+--------------------------------------------------------+--------------------+

Comparison: split (), concat (), array_zip (), and explode ()

Featuresplit()concat()array_zip()explode()
DescriptionSplits a string column into an array of substrings based on a delimiter.Concatenates multiple arrays or strings into a single array or string.Zips multiple arrays element-wise into a single array of structs.Flattens an array into multiple rows, with one row per element in the array.
Input TypeStringArrays/StringsArraysArray
Output TypeArray of StringsArray or StringArray of StructsMultiple Rows (with original columns)
Key Use CasesSplitting strings based on delimiters (e.g., splitting comma-separated values).Merging multiple arrays into one, or multiple strings into one.Aligning data from multiple arrays element-wise, treating each set of elements as a row (struct).Flattening arrays for row-by-row processing (e.g., after zipping or concatenating arrays).
Examplesplit(col("string_col"), ",")["a", "b", "c"]concat(col("array1"), col("array2"))["a", "b", "x", "y"]array_zip(col("array1"), col("array2"))[{'a', 1}, {'b', 2}]explode(col("array_col")) → Converts an array into separate rows.
Handling Different LengthsNot applicableIf input arrays have different lengths, the shorter ones are concatenated as-is.If input arrays have different lengths, the shorter ones are padded with null.Not applicable. Converts each element into separate rows, regardless of length.
Handling null valuesWill split even if the string contains null values (but may produce empty strings).If arrays contain null, concat() still works, returning the non-null elements.Inserts null values into the struct where input arrays have null for a corresponding index.Preserves null elements during the explosion but still creates separate rows.

Breakdown:

  • split() is used to break a single string into an array of substrings.
  • concat() merges arrays or strings, resulting in a single array or string.
  • array_zip() aligns elements from multiple arrays, creating an array of structs.
  • explode() takes an array and converts it into multiple rows, one for each array element.

Comparison of transform() and udf() in PySpark

transform ()

Usage: transform() is a higher-order function available in PySpark that applies a custom function element-wise to each element in an array column. It’s a DataFrame-specific function that is mainly used for manipulating array elements in a column.

Performance: Since transform() operates natively within Spark’s execution engine, it is highly optimized. It avoids the performance overhead that comes with using UDFs.

udf ()

Usage: udf() allows you to define and apply a user-defined function to a column or multiple columns. It can be used for more general-purpose operations, applying Python functions to rows or columns that don’t necessarily need to be array types.

Performance: UDFs in PySpark are not optimized as they run Python code within the JVM context, which introduces significant overhead due to serialization and deserialization (data is transferred between Python and JVM).

Side by side comparison

Function TypeHigher-order functionUser-defined function (UDF)
PurposeApply a function element-wise to array columnsApply custom Python logic to any column or columns
PerformanceHighly optimized, uses Spark’s Catalyst engineSlower, incurs overhead due to Python-JVM communication
Supported Data TypesArrays (array of elements in a column)Any data type (strings, integers, arrays, etc.)
Return TypeReturns native Spark typesCan return complex types but requires explicit return type definition
Ease of UseSimple to use for array-specific transformationsFlexible but requires registering the function and specifying return types
Serialization OverheadNo overhead (operates entirely within Spark engine)Significant overhead as data moves between Python and JVM
When to Use– When you have an array column and need to modify each element
– For array transformations with high performance
– When complex logic is required that cannot be expressed using native PySpark functions
– When you need full flexibility for applying custom logic to various data types
FlexibilityLimited to arrays and simple element-wise operationsVery flexible, can handle any data type and complex custom logic
ParallelismUses Spark’s internal optimizations for parallelismPython UDFs don’t leverage full Spark optimizations and may run slower
Built-in FunctionsCan use SQL expressions or anonymous functionsRequires explicit Python code, even for simple operations
Examplepython df.withColumn("squared_numbers", F.expr("transform(numbers, x -> x * x)"))python square_udf = udf(lambda x: x * x, IntegerType()) df.withColumn("squared_value", square_udf("value"))
Error HandlingErrors are handled natively by SparkMay require extra effort to handle errors within the UDF logic
Use Case Limitations– Only for array column manipulation– Can be used on any column type but slower compared to native functions

Key Takeaway

  • transform() is faster and preferred for operations on array columns when the logic is simple and can be expressed within Spark’s native functions.
  • udf() provides the flexibility to handle more complex and custom transformations but comes at a performance cost, so it should be used when necessary and avoided if native Spark functions can achieve the task.

StructType(), StructField()

StructType (), StructField ()

StructType () in PySpark is part of the pyspark.sql.types module and it is used to define the structure of a DataFrame schema.
StructField () is a fundamental part of PySpark’s StructType, used to define individual fields (columns) within a schema. A StructField specifies the name, data type, and other attributes of a column in a DataFrame schema.

StructType Syntax

from pyspark.sql.types import StructType, StructField, StringType, IntegerType 
schema = StructType([ 
StructField("name", StringType(), True), 
StructField("age", IntegerType(), True) 
])

StructField Syntax


StructField(name, dataType, nullable=True, metadata=None)

Parameter

fields (optional): A list of StructField objects that define the schema. Each StructField object specifies the name, type, and whether the field can be null.

Key Components

  • name: The name of the column.
  • dataType: The data type of the column (e.g., StringType(), IntegerType(), DoubleType(), etc.).
  • nullable: Boolean flag indicating whether the field can contain null values (True for nullable).

Common Data Types Used in StructField

  • StringType(): Used for string data.
  • IntegerType(): For integers.
  • DoubleType(): For floating-point numbers.
  • LongType(): For long integers.
  • ArrayType(): For arrays (lists) of values.
  • MapType(): For key-value pairs (dictionaries).
  • TimestampType(): For timestamp fields.
  • BooleanType(): For boolean values (True/False).
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Define schema using StructType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create DataFrame using the schema
data = [("John", 30), ("Alice", 25)]
df = spark.createDataFrame(data, schema)
df.show()
+-----+---+
| name|age|
+-----+---+
| John| 30|
|Alice| 25|
+-----+---+

Nested Schema

StructType can define a nested schema. For example, a column in the DataFrame might itself contain multiple fields.

nested_schema = StructType([
    StructField("name", StringType(), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True)
    ]), True)
])

contains(), collect(), transform(), udf(), udf for sql

contains ()

The contains() function in PySpark is used to check if a string column contains a specific substring. It’s typically used in a filter() or select() operation to search within the contents of a column and return rows where the condition is true.

Syntax

Column.contains(substring: str)

Key Points

  • contains() is case-sensitive by default. For case-insensitive matching, use it with lower() or upper().
  • It works on string columns only. For non-string columns, you would need to cast the column to a string first.
sample DF
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
| Robert|        IT|
|  Maria|        IT|
+-------+----------+

# Filter rows where 'Department' contains 'Sales'
filtered_df = df.filter(df["Department"].contains("Sales"))
filtered_df.show()
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
+-------+----------+

# Add a new column 'Is_Sales' based on whether 'Department' contains 'Sales'
from pyspark.sql.functions import when
df_with_flag = df.withColumn(
    "Is_Sales",
    when(df["Department"].contains("Sales"), "Yes").otherwise("No")
)

df_with_flag.show()
+-------+----------+--------+
|   Name|Department|Is_Sales|
+-------+----------+--------+
|  James|     Sales|     Yes|
|Michael|     Sales|     Yes|
| Robert|        IT|      No|
|  Maria|        IT|      No|
+-------+----------+--------+

#Case-Insensitive Search
# Search for 'sales' in a case-insensitive manner
from pyspark.sql.functions import lower
df_lower_filtered = df.filter(lower(df["Department"]).contains("sales"))
df_lower_filtered.show()
+-------+----------+
|   Name|Department|
+-------+----------+
|  James|     Sales|
|Michael|     Sales|
+-------+----------+

collect ()

The collect () function simply gathers all rows from the DataFrame or RDD and returns them as a list of Row objects. It does not accept any parameters.

Syntax

DataFrame.collect()
not any parameter at all

Key Points

  • Use on Small Data: Since collect() brings all the data to the driver, it should be used only on small datasets. If you try to collect a very large dataset, it can cause memory issues or crash the driver..
  • For large datasets, consider using alternatives like take(n), toPandas(), show(n)
sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

# Collect all rows from the DataFrame
collected_data = df.collect()

# Access all rows
print(collected_data)
[Row(Name='Alice', Age=25), Row(Name='Bob', Age=30), Row(Name='Charlie', Age=35)]

# Access a row
print(collected_data[0])
Row(Name='Alice', Age=25)

# Access a cell
print(collected_data[0][0])
Alice

# Display the collected data
for row in collected_data:
    print(row)
==output==
Row(Name='Alice', Age=25)
Row(Name='Bob', Age=30)
Row(Name='Charlie', Age=35)

# Filter and collect
filtered_data = df.filter(df["Age"] > 30).collect()

# Process collected data
for row in filtered_data:
    print(f"{row['Name']} is older than 30.")
==output==
Charlie is older than 30.


# Access specific columns from collected data
for row in collected_data:
    print(f"Name: {row['Name']}, Age: {row.Age}")
==output==
Name: Alice, Age: 25
Name: Bob, Age: 30
Name: Charlie, Age: 35



transform ()

The transform () function in PySpark is a higher-order function introduced to apply custom transformations to columns in a DataFrame. It allows you to perform a transformation function on an existing column, returning the transformed data as a new column. It is particularly useful when working with complex data types like arrays or for applying custom logic to column values.

Syntax

df1 = df.transform(my_function)

Parameters

my_function: a python function

Key point

The transform() method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function namemy_function"

sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

from pyspark.sql.functions import upper
#Declare my function
def addTheAge(mydf):    
    return mydf.withColumn('age',mydf.age + 2)

def upcaseTheName(mydf):
    reture mydf.withColumn("Name", upper(mydf.Name))

#transforming: age add 2 years
df1=df.transform(addTheAge)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 27|
|    Bob| 32|
|Charlie| 37|
+-------+---+

The transform () method takes a function as an argument. It automatically passes the DataFrame (in this case, df) to the function. So, you only need to pass the function name "my_function".


#transforming to upcase
df1=df.transform(upcaseTheName)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  ALICE| 25|
|    BOB| 30|
|CHARLIE| 35|
+-------+---+

#transforming to upcase and age add 2 years
df1=df.transform(addTheAge).transform(upcaseTheName)
df1.show()
+-------+---+
|   Name|Age|
+-------+---+
|  ALICE| 27|
|    BOB| 32|
|CHARLIE| 37|
+-------+---+


udf ()

The udf (User Defined Function) allows you to create custom transformations for DataFrame columns using Python functions. You can use UDFs when PySpark’s built-in functions don’t cover your use case or when you need more complex logic.

Syntax

from pyspark.sql.functions import udf
from pyspark.sql.types import DataType
# Define a UDF function ,
# Register the function as a UDF
my_udf = udf(py_function, returnType)

Parameters

  • py_function: A Python function you define to transform the data.
  • returnType: PySpark data type (e.g., StringType(), IntegerType(), DoubleType(), etc.) that indicates what type of data your UDF returns.
    • StringType(): For returning strings.
    • IntegerType(): For integers.
    • FloatType(): For floating-point numbers.
    • BooleanType(): For booleans.
    • ArrayType(DataType): For arrays.
    • StructType(): For structured data.

Key point

You need to specify the return type for the UDF, as PySpark needs to understand how to handle the results of the UDF.

sample DF
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+


from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function
def to_upper(name):
    return name.upper()

# Register the function as a UDF, specifying the return type as StringType
uppercase_udf = udf(to_upper, StringType())

# Apply the UDF using withColumn
df_upper = df.withColumn("Upper_Name", uppercase_udf(df["Name"]))

df_upper.show()
+-------+---+----------+
|   Name|Age|Upper_Name|
+-------+---+----------+
|  Alice| 25|     ALICE|
|    Bob| 30|       BOB|
|Charlie| 35|   CHARLIE|
+-------+---+----------+

# UDF Returning Boolean
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# Define a Python function
def is_adult(age):
    return age > 30

# Register the UDF
is_adult_udf = udf(is_adult, BooleanType())

# Apply the UDF
df_adult = df.withColumn("Is_Adult", is_adult_udf(df["Age"]))

# Show the result
df_adult.show()
+-------+---+--------+
|   Name|Age|Is_Adult|
+-------+---+--------+
|  Alice| 25|   false|
|    Bob| 30|   false|
|Charlie| 35|    true|
+-------+---+--------+

# UDF with Multiple Columns (Multiple Arguments)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function that concatenates two columns
def concat_name_age(name, age):
    return f"{name} is {age} years old"

# Register the UDF
concat_udf = udf(concat_name_age, StringType())

# Apply the UDF to multiple columns
df_concat = df.withColumn("Description", concat_udf(df["Name"], df["Age"]))

# Show the result
df_concat.show()
+-------+---+--------------------+
|   Name|Age|         Description|
+-------+---+--------------------+
|  Alice| 25|Alice is 25 years...|
|    Bob| 30| Bob is 30 years old|
|Charlie| 35|Charlie is 35 yea...|
+-------+---+--------------------+

spark.udf.register(), Register for SQL query

in PySpark, you can use spark.udf.register() to register a User Defined Function (UDF) so that it can be used not only in DataFrame operations but also in SQL queries. This allows you to apply your custom Python functions directly within SQL statements executed against your Spark session.

Syntax

spark.udf.register(“registered_pyFun_for_sql”, original_pyFun, returnType)

parameter

  • registered_pyFun_for_sql: The name of the UDF, which will be used in SQL queries.
  • original_pyFun: The original Python function that contains the custom logic.
  • returnType: The return type of the UDF, which must be specified (e.g., StringType(), IntegerType()).
smaple df
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

# Define a Python function
def original_myPythonFun(name):
    return name.upper()

# Register the function as a UDF for SQL with the return type StringType
spark.udf.register("registered_pythonFun_for_sql"\
                    , original_myPythonFun\
                    , StringType()\
                )

# Use the UDF - "registered_pythonFun_for_sql" in a SQL query
result = spark.sql(\
    "SELECT Name, registered_pythonFun_for_sql(Name) AS Upper_Name \
        FROM people"\
    )
result.show()
+-------+----------+
|   Name|Upper_Name|
+-------+----------+
|  Alice|     ALICE|
|    Bob|       BOB|
|Charlie|   CHARLIE|
+-------+----------+

we can directly use SQL style with magic %sql

%sql
SELECT Name
      , registered_pythonFun_for_sql(Name) AS Upper_Name 
  FROM people
+-------+----------+
|   Name|Upper_Name|
+-------+----------+
|  Alice|     ALICE|
|    Bob|       BOB|
|Charlie|   CHARLIE|
+-------+----------+
A complex python function declares, udf register, and apply example

Sample DataFrame

#Sample DataFrame
data = [    (1, 2, 3, 9, "alpha", "beta", "gamma"),    (4, 5, 6, 8, "delta", "epsilon", "zeta")]

df = spark.createDataFrame(data, ["col1", "col2", "col3", "col9", "str1", "str2", "str3"])
df.show()
+----+----+----+----+-----+-------+-----+
|col1|col2|col3|col9| str1|   str2| str3|
+----+----+----+----+-----+-------+-----+
|   1|   2|   3|   9|alpha|   beta|gamma|
|   4|   5|   6|   8|delta|epsilon| zeta|
+----+----+----+----+-----+-------+-----+

Define the Python function to handle multiple columns and scalars

# Define the Python function to handle multiple columns and scalars

def pyFun(col1, col2, col3, col9, int1, int2, int3, str1, str2, str3):
    # Example business logic
    re_value1 = col1 * int1 + len(str1)
    re_value2 = col2 + col9 + len(str2)
    re_value3 = col3 * int3 + len(str3)
    value4 = re_value1 + re_value2 + re_value3

    # Return multiple values as a tuple
    return re_value1, re_value2, re_value3, value4

Define the return schema for the UDF

# Define the return schema for the UDF

return_schema = StructType([
    StructField("re_value1", IntegerType(), True),
    StructField("re_value2", IntegerType(), True),
    StructField("re_value3", IntegerType(), True),
    StructField("value4", IntegerType(), True)
])

Register the UDF

# register the UDF

# for SQL use this 
# spark.udf.register("pyFun_udf", pyFun, returnType=return_schema)

pyFun_udf = udf(pyFun, returnType=return_schema)

Apply the UDF to the DataFrame, using ‘lit()’ for constant values

# Apply the UDF to the DataFrame, using 'lit()' for constant values

df_with_udf = df.select(
    col("col1"),
    col("col2"),
    col("col3"),
    col("col9"),
    col("str1"),
    col("str2"),
    col("str3"),
    pyFun_udf(
        col("col1"),
        col("col2"),
        col("col3"),
        col("col9"),
        lit(10),  # Use lit() for the constant integer values
        lit(20),
        lit(30),
        col("str1"),
        col("str2"),
        col("str3")
    ).alias("result")
)

# Show the result
df_with_udf.show(truncate=False)

==output==
+----+----+----+----+-----+-------+-----+------------------+
|col1|col2|col3|col9|str1 |str2   |str3 |result            |
+----+----+----+----+-----+-------+-----+------------------+
|1   |2   |3   |9   |alpha|beta   |gamma|{15, 15, 95, 125} |
|4   |5   |6   |8   |delta|epsilon|zeta |{45, 20, 184, 249}|
+----+----+----+----+-----+-------+-----+------------------+

Access the “result”

from pyspark.sql.functions import col
df_result_re_value3 = df_with_udf.select ('col1', 'result',col('result').re_value3.alias("re_value3"))
df_result_re_value3.show()
+----+------------------+---------+
|col1|            result|re_value3|
+----+------------------+---------+
|   1| {15, 15, 95, 125}|       95|
|   4|{45, 20, 184, 249}|      184|
+----+------------------+---------+

udf register for sql using

“sample dataframe”, “declare python function”, “Define the return schema for the UDF” are the same. only register udf different.

# register the UDF

spark.udf.register("pyFun_udf", pyFun, returnType=return_schema)

# create a temporary view for SQL query
df.createOrReplaceTempView("my_table")

notebook uses magic %sql


%sql
SELECT col1, col2, col3, col9, str1, str2, str3,            pyFun_udf(col1, col2, col3, col9, 10, 20, 30, str1, str2, str3) AS result    
FROM my_table

==output==
col1	col2	col3	col9	str1	str2	str3	result
1	2	3	9	alpha	beta	gamma	{"re_value1":15,"re_value2":15,"re_value3":95,"value4":125}
4	5	6	8	delta	epsilon	zeta	{"re_value1":45,"re_value2":20,"re_value3":184,"value4":249}