PySpark Data sources

PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive

Query Database Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()
# Query table using jdbc()
# Parameters
server_name = "myserver.database.windows.net"
port = "1433"
database_name = "mydatabase"
table_name = "mytable"
username = "myusername"
password = "mypassword"

# Construct the JDBC URL
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read data from the SQL database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
# show DataFrame
df_dep.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

Query JDBC Table Parallel, specific Columns

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("query", "select id,age from emp where sex='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

df.show()
Read & Write CSV File

Read CSV File

df = spark.read.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.load("/Folder/, /filePath2/")

Write CSV File

df = spark.write.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.mode("overwrite")
.save("/filePath/filename")

By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

we need take additional step – “Rename the File to name1.csv

# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")

# Filter for the part file
for file in files:
    if file.name.startswith("part-"):
        source_file = file.path  # Full path to the part file
        destination_file = "dbfs:/FileStore/name1.csv"
        
        # Move and rename the file
        dbutils.fs.mv(source_file, destination_file)
        break


display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name

PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

Steps:

  1. Use coalesce(1) to combine all data into a single partition.
  2. Save the file to a temporary location.
  3. Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/temp_folder")

# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
    if file.name.startswith("part-"):
        part_file = file.path
        break

# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")

# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Read & Write Parquet File

sample data save at /tmp/output/people.parquet

dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet

read from parquet

sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

read a parquet is the same as reading from csv, nothing special.

write to a parquet

  • append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.

    df.write.mode(“append”).parquet(“path/to/parquet/file”)
  • overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“overwrite”).parquet(“path/to/parquet/file”)
  • ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.

    df.write.mode(“ignore”).parquet(“path/to/parquet/file”)
  • Create Parquet partition file

    df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

Read & write Delta Table in PySpark

Read from a Delta Table

Read from a Registered Delta Table (in the Catalog)
df = spark.read.table("default.dim_dep")

df.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+
Read from a Delta Table Stored in a Directory (Path-Based)
df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")

df_delta_table.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
|       1| 1001|   IT|2019-01-01|9999-12-31|      true|
|       2| 1002|Sales|2019-01-01|9999-12-31|      true|
|       3| 1003|   HR|2019-01-01|9999-12-31|      true|
+--------+-----+-----+----------+----------+----------+

Write to a Delta Table

Append to a Delta Table
# Appends to the Delta table
df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")  
Overwrite a Delta Table
# Overwrites the Delta table
df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")  
Create or Overwrite a Registered Delta Table
# Overwrites the table in the catalog
df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep") 
Append to a Registered Delta Table:
# Appends to the table in the catalog
df.write.format("delta").mode("append").saveAsTable("default.dim_dep")  

merge operation (upsert)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Perform the merge operation
target_table.alias("t").merge(
    source_df.alias("s"),
    "t.id = s.id"  # Join condition: match rows based on `id`
).whenMatchedUpdate(
    set={
        "name": "s.name",  # Update `name` column
        "date": "s.date"   # Update `date` column
    }
).whenNotMatchedInsert(
    values={
        "id": "s.id",      # Insert `id`
        "name": "s.name",  # Insert `name`
        "date": "s.date"   # Insert `date`
    }
).execute()

# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
  1. Target Table (target_table):
    • The Delta table is loaded using DeltaTable.forPath.
    • This table contains existing data where updates or inserts will be applied.
  2. Source DataFrame (source_df):
    • This DataFrame contains new or updated records.
  3. Join Condition ("t.id = s.id"):
    • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
  4. whenMatchedUpdate:
    • If a matching row is found, update the name and date columns in the target table.
  5. whenNotMatchedInsert:
    • If no matching row is found, insert the new record from the source DataFrame into the target table.
  6. execute():
    • Executes the merge operation, applying updates and inserts.
  7. Result Verification:
    • After the merge, the updated Delta table is read and displayed.

Schema Evolution

df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
Read & Write JSON file

Read a simple JSON file

# Read a simple JSON file into dataframe
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}

df_simple = spark.read.format("json") \
.option("multiline","True")\
.load("dbfs:/FileStore/simple_json.json")

df_simple.printSchema()
root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)

df_simple.show()
+------------+------------+-----+-----------+-------+
|        City|RecordNumber|State|ZipCodeType|Zipcode|
+------------+------------+-----+-----------+-------+
|BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+------------+------------+-----+-----------+-------+

pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error

df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
df_simple,show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).

Reading from Multiline JSON (JSON Array) File

[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]
df_jsonarray_simple = spark.read\
    .format("json")\
    .option("multiline", "true")\
    .load("dbfs:/FileStore/jsonArrary.json")

df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

read complex json

df_complexjson=spark.read\
    .option("multiline","true")\
    .json("dbfs:/FileStore/jsonArrary2.json")

df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0001                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Cake                                                                                                                                      
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}                                                           
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}] 
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0002                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Raised                                                                                                                                    
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}]}                                                                                                                       
 topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]                                                           
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
 id      | 0003                                                                                                                                      
 type    | donut                                                                                                                                     
 name    | Old Fashioned                                                                                                                             
 ppu     | 0.55                                                                                                                                      
 batters | {[{1001, Regular}, {1002, Chocolate}]}                                                                                                    
 topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]                                                                          

Reading from Multiple files at a time

# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json','resources/zipcode2.json'])

Reading from Multiple files at a time

# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")

Reading files with a user-specified custom schema

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("resources/zipcodes.json")
df_with_schema.printSchema()

Reading File using PySpark SQL

spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Write to JSON

Options

  • path: Specifies the path where the JSON files will be saved.
  • mode: Specifies the behavior when writing to an existing directory.
  • compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”).
    df2.write . option(“compression”, “gzip”)
  • dateFormat: Specifies the format for date and timestamp columns.
    df.write . option(“dateFormat”, “yyyy-MM-dd”)
  • timestampFormat: Specifies the format for timestamp columns.
    df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
  • lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline)
    df . write . option(“lineSep”, “\r\n”)
  • encoding: Specifies the character encoding to use when writing the JSON files.
    UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
    df . write . option(“encoding”, “UTF-8”)
df2.write . format("json")
 . option("compression", "gzip") \
 . option("dateFormat", "yyyy-MM-dd") \ 
 . option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
 . option("encoding", "UTF-8") \
 . option("lineSep", "\r\n")
 . save("dbfs:/FileStore/output_json")

modes

  1. Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
  2. Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
  3. Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
  4. Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Read & Write SQL Server

Read from SQL Server

server_name = "mainri-sqldb.database.windows.net"
port=1433
username="my login name"
password="my login password"
database_name="mainri-sqldb"
table_name="dep"

jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
+-----+--------------+

alternative way

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .load()

Select Specific Columns to Read

In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.

df1=spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("databaseName",database_name) \
  .option("query", "select * from [dbo].[dep] where depid>3") \
  .option("user", username) \
  .option("password", password) \
  .load()

df.show()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    4|human resource|
|    5|         admin|
+-----+--------------+

write to table

write mode:

Append:
mode("append") to append the rows to the existing SQL Server table.

# we have define variables, here is show again
server_name = “mainri-sqldb.database.windows.net”
port=1433
username=”my login name”
password=”my login password”
database_name=”mainri-sqldb”
table_name=”dep”
jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”

# append the rows to the existing SQL Server table.

df_newrow.show()
+-----+-----+
|depid|  dep|
+-----+-----+
|    5|admin|
+-----+-----+

df_newrow.write \
  .format("jdbc") \
  .mode("append") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
+-----+--------------+
|depid|           dep|
+-----+--------------+
|    1|            IT|
|    2|          Sale|
|    3|       Finance|
|    4|human resource|
|    5|         admin| <- new added row
+-----+--------------+

overwrite

The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.

df_newrow.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbc_url) \
  .option("dbtable", table_name) \
  .option("user", username) \
  .option("password", password) \
  .save()
Read & Write MySQL

Read from MySQL

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select Specific Columns to Read

# Read from MySQL Table
val df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query", "select id,age from employee where gender='M'") \
    .option("numPartitions",4) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Write to MySQL

Some points to note while writing

  • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
  • To retain the indexes, use option("truncate","true").
  • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
  • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
# Write to MySQL Table
sampleDF.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/emp") \
  .option("dbtable", "employee") \
  .option("user", "root") \
  .option("password", "root") \
  .save()
Read JDBC in Parallel

PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. 

The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

# Read Table in Parallel
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable","employee") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Select columns with where clause

# Select columns with where clause
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Using fetchsize with numPartitions to Read

The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.

# Using fetchsize
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("query","select id,age from employee where gender='M'") \
    .option("numPartitions",5) \
    .option("fetchsize", 20) \
    .option("user", "root") \
    .option("password", "root") \
    .load()
Read & Write delta table, Catalog, Hive Table

Read delta table, Catalog, Hive Table

# Read Hive table
df = spark.sql("select * from emp.employee")
df.show()
# Read Hive table
df = spark.read.table("employee")
df.show()
# Read delta table
df = spark.sql("select * from delta.` table path `  ")
df.show()

caution: ` Backticks 

Write / Save

To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

use SQL, temporary view

# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")

# Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Overview of Commonly Used Unity Catalog and Spark SQL Management Commands

Summary of frequently use Unity Catalog and Spark SQL management commands, organized in a table.

CategoryCommandDescriptionExample
Catalog ManagementSHOW CATALOGSLists all available catalogs.SHOW CATALOGS;
Schema ManagementSHOW SCHEMAS IN <catalog_name>Lists schemas (databases) within a catalog.SHOW SCHEMAS IN main;
DESCRIBE SCHEMA <catalog_name>.<schema_name>Provides metadata about a specific schema.DESCRIBE SCHEMA main.default;
Table ManagementSHOW TABLES IN <catalog_name>.<schema_name>Lists all tables in a schema.SHOW TABLES IN main.default;
DESCRIBE TABLE <catalog_name>.<schema_name>.<table_name>Displays metadata about a specific table.DESCRIBE TABLE main.default.sales_data;
SHOW PARTITIONS <catalog_name>.<schema_name>.<table_name>Lists partitions of a partitioned table.SHOW PARTITIONS main.default.sales_data;
SHOW COLUMNS IN <catalog_name>.<schema_name>.<table_name>Lists all columns of a table, including their data types.SHOW COLUMNS IN main.default.sales_data;
DROP TABLE <catalog_name>.<schema_name>.<table_name>Deletes a table from the catalog.DROP TABLE main.default.sales_data;
Database ManagementSHOW DATABASESLists all databases (schemas) in the environment.SHOW DATABASES;
DESCRIBE DATABASE <database_name>Provides metadata about a specific database.DESCRIBE DATABASE default;
Data QueryingSELECT * FROM <catalog_name>.<schema_name>.<table_name>Queries data from a table.SELECT * FROM main.default.sales_data WHERE region = 'West';
Table CreationCREATE TABLE <catalog_name>.<schema_name>.<table_name> (<columns>)Creates a managed table in Unity Catalog.CREATE TABLE main.default.sales_data (id INT, region STRING, amount DOUBLE);

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Read table from Unity Catalog and write table to Unity Catalog

To read from and write to Unity Catalog in PySpark, you typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name.

Reading from Unity Catalog

To read a table from Unity Catalog, specify the table’s full path:

# Reading a table
df = spark.read.table("catalog.schema.table")
df.show()

# Using Spark SQL
df = spark.sql("SELECT * FROM catalog.schema.table")

Writing to Unity Catalog

To write data to Unity Catalog, you specify the table name in the saveAsTable method:

# Writing a DataFrame to a new table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog.schema.new_table")

Options for Writing to Unity Catalog:

  • format: Set to "delta" for Delta Lake tables, as Unity Catalog uses Delta format.
  • mode: Options include overwrite, append, ignore, and error.

Example: Read, Transform, and Write Back to Unity Catalog

# Read data from a Unity Catalog table
df = spark.read.table("catalog_name.schema_name.source_table")

# Perform transformations
transformed_df = df.filter(df["column_name"] > 10)

# Write transformed data back to a different table
transformed_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_name.schema_name.target_table")

Comparison of Delta, JSON, and CSV Reads/Writes

FormatStorage LocationRead SyntaxWrite SyntaxNotes
DeltaUnity Catalogdf = spark.read.table("catalog.schema.table")df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.table")Unity Catalog natively supports Delta with schema enforcement and versioning.
Blob/ADLSdf = spark.read.format("delta").load("path/to/delta/folder")df.write.format("delta").mode("overwrite").save("path/to/delta/folder")Requires Delta Lake library; supports ACID transactions and time-travel capabilities.
JSONUnity CatalogNot directly supported in Unity Catalog; typically needs to be read as a Delta table or temporary table.Not directly supported; must be converted to Delta format before writing to Unity Catalog.Convert JSON to Delta format to enable integration with Unity Catalog.
Blob/ADLSdf = spark.read.json("path/to/json/files")df.write.mode("overwrite").json("path/to/json/folder")Simple structure, no schema enforcement by default; ideal for semi-structured data.
CSVUnity CatalogNot directly supported; CSV files should be imported as Delta tables or temporary views.Not directly supported; convert to Delta format for compatibility with Unity Catalog.Similar to JSON, requires conversion for use in Unity Catalog.
Blob/ADLSdf = spark.read.option("header", True).csv("path/to/csv/files")df.write.option("header", True).mode("overwrite").csv("path/to/csv/folder")Lacks built-in schema enforcement; additional steps needed for ACID or schema evolution.

Detailed Comparison and Notes:

  1. Unity Catalog
    • Delta: Unity Catalog fully supports Delta format, allowing for schema evolution, ACID transactions, and built-in security and governance.
    • JSON and CSV: To use JSON or CSV in Unity Catalog, convert them into Delta tables or load them as temporary views before making them part of Unity’s governed catalog. This is because Unity Catalog enforces structured data formats with schema definitions.
  2. Blob Storage & ADLS (Azure Data Lake Storage)
    • Delta: Blob Storage and ADLS support Delta tables if the Delta Lake library is enabled. Delta on Blob or ADLS retains most Delta features but may lack some governance capabilities found in Unity Catalog.
    • JSON & CSV: Both Blob and ADLS provide support for JSON and CSV formats, allowing flexibility with semi-structured data. However, they do not inherently support schema enforcement, ACID compliance, or governance features without Delta Lake.
  3. Delta Table Benefits:
    • Schema Evolution and Enforcement: Delta enables schema evolution, essential in big data environments.
    • Time Travel: Delta provides versioning, allowing access to past versions of data.
    • ACID Transactions: Delta ensures consistency and reliability in large-scale data processing.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)