PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive
Query Database Table
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "root") \
.load()
# Query table using jdbc()
# Parameters
server_name = "myserver.database.windows.net"
port = "1433"
database_name = "mydatabase"
table_name = "mytable"
username = "myusername"
password = "mypassword"
# Construct the JDBC URL
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
# Read data from the SQL database
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
# show DataFrame
df_dep.show()
+-----+--------------+
|depid| dep|
+-----+--------------+
| 1| IT|
| 2| Sale|
| 3| Finance|
| 4|human resource|
+-----+--------------+
Query JDBC Table Parallel, specific Columns
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("query", "select id,age from emp where sex='M'") \
.option("numPartitions",5) \
.option("fetchsize", 20) \
.option("user", "root") \
.option("password", "root") \
.load()
df.show()
Read & Write CSV File
Read CSV File
df = spark.read.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.load("/Folder/, /filePath2/")
Write CSV File
df = spark.write.format("csv")\
.options(delimiter=',') \
.options(inferSchema='True')\
.options(header='True')\
.mode("overwrite")
.save("/filePath/filename")
By default, If you try to write directly to a file (e.g., name1.csv
), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.
path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv
PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1)
or repartition(1)
operation before writing, which reduces the number of partitions to one.
df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")
at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv
dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv
we need take additional step – “Rename the File to name1.csv
“
# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
# Filter for the part file
for file in files:
if file.name.startswith("part-"):
source_file = file.path # Full path to the part file
destination_file = "dbfs:/FileStore/name1.csv"
# Move and rename the file
dbutils.fs.mv(source_file, destination_file)
break
display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name
PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:
Steps:
- Use
coalesce(1)
to combine all data into a single partition. - Save the file to a temporary location.
- Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("dbfs:/FileStore/temp_folder")
# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
if file.name.startswith("part-"):
part_file = file.path
break
# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Read & Write Parquet File
sample data save at /tmp/output/people.parquet

dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet
read from parquet
sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| James | | Smith|36636| M| 3000|
| Michael | Rose| |40288| M| 4000|
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
read a parquet is the same as reading from csv, nothing special.
write to a parquet
- append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.
df.write.mode(“append”).parquet(“path/to/parquet/file”) - overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.
df.write.mode(“overwrite”).parquet(“path/to/parquet/file”) - ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.
df.write.mode(“ignore”).parquet(“path/to/parquet/file”) - Create Parquet partition file
df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

Read & write Delta Table in PySpark
Read from a Delta Table
Read from a Registered Delta Table (in the Catalog)
df = spark.read.table("default.dim_dep")
df.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID| dep| StartDate| EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
| 1| 1001| IT|2019-01-01|9999-12-31| true|
| 2| 1002|Sales|2019-01-01|9999-12-31| true|
| 3| 1003| HR|2019-01-01|9999-12-31| true|
+--------+-----+-----+----------+----------+----------+
Read from a Delta Table Stored in a Directory (Path-Based)
df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")
df_delta_table.show()
+--------+-----+-----+----------+----------+----------+
|Surrokey|depID| dep| StartDate| EndDate|IsActivity|
+--------+-----+-----+----------+----------+----------+
| 1| 1001| IT|2019-01-01|9999-12-31| true|
| 2| 1002|Sales|2019-01-01|9999-12-31| true|
| 3| 1003| HR|2019-01-01|9999-12-31| true|
+--------+-----+-----+----------+----------+----------+
Write to a Delta Table
Append to a Delta Table
# Appends to the Delta table
df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")
Overwrite a Delta Table
# Overwrites the Delta table
df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")
Create or Overwrite a Registered Delta Table
# Overwrites the table in the catalog
df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep")
Append to a Registered Delta Table:
# Appends to the table in the catalog
df.write.format("delta").mode("append").saveAsTable("default.dim_dep")
merge operation (upsert)
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit
# Perform the merge operation
target_table.alias("t").merge(
source_df.alias("s"),
"t.id = s.id" # Join condition: match rows based on `id`
).whenMatchedUpdate(
set={
"name": "s.name", # Update `name` column
"date": "s.date" # Update `date` column
}
).whenNotMatchedInsert(
values={
"id": "s.id", # Insert `id`
"name": "s.name", # Insert `name`
"date": "s.date" # Insert `date`
}
).execute()
# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
- Target Table (
target_table
):- The Delta table is loaded using
DeltaTable.forPath
. - This table contains existing data where updates or inserts will be applied.
- The Delta table is loaded using
- Source DataFrame (
source_df
):- This DataFrame contains new or updated records.
- Join Condition (
"t.id = s.id"
):- Rows in the target table (
t
) are matched with rows in the source DataFrame (s
) based onid
.
- Rows in the target table (
whenMatchedUpdate
:- If a matching row is found, update the
name
anddate
columns in the target table.
- If a matching row is found, update the
whenNotMatchedInsert
:- If no matching row is found, insert the new record from the source DataFrame into the target table.
execute()
:- Executes the merge operation, applying updates and inserts.
- Result Verification:
- After the merge, the updated Delta table is read and displayed.
Schema Evolution
df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
Read & Write JSON file
Read a simple JSON file
# Read a simple JSON file into dataframe
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}
df_simple = spark.read.format("json") \
.option("multiline","True")\
.load("dbfs:/FileStore/simple_json.json")
df_simple.printSchema()
root
|-- City: string (nullable = true)
|-- RecordNumber: long (nullable = true)
|-- State: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- Zipcode: long (nullable = true)
df_simple.show()
+------------+------------+-----+-----------+-------+
| City|RecordNumber|State|ZipCodeType|Zipcode|
+------------+------------+-----+-----------+-------+
|BDA SAN LUIS| 10| PR| STANDARD| 709|
+------------+------------+-----+-----------+-------+
pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error
df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
df_simple,show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).
Reading from Multiline JSON (JSON Array) File
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
df_jsonarray_simple = spark.read\
.format("json")\
.option("multiline", "true")\
.load("dbfs:/FileStore/jsonArrary.json")
df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
| City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR| 2| PR| STANDARD| 704|
| BDA SAN LUIS| 10| PR| STANDARD| 709|
+-------------------+------------+-----+-----------+-------+
read complex json

df_complexjson=spark.read\
.option("multiline","true")\
.json("dbfs:/FileStore/jsonArrary2.json")
df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
id | 0001
type | donut
name | Cake
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}]
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
id | 0002
type | donut
name | Raised
ppu | 0.55
batters | {[{1001, Regular}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
id | 0003
type | donut
name | Old Fashioned
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}]}
topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]
Reading from Multiple files at a time
# Read multiple files
df2 = spark.read.json(
['resources/zipcode1.json','resources/zipcode2.json'])
Reading from Multiple files at a time
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
Reading files with a user-specified custom schema
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema) \
.json("resources/zipcodes.json")
df_with_schema.printSchema()
Reading File using PySpark SQL
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()
Write to JSON
Options
path
: Specifies the path where the JSON files will be saved.mode
: Specifies the behavior when writing to an existing directory.compression
: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”).df2.write
. option(“compression”, “gzip”)dateFormat
: Specifies the format for date and timestamp columns.
option(“dateFormat”, “yyyy-MM-dd”)df.write
.timestampFormat
: Specifies the format for timestamp columns.
option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)df.write
.lineSep
: Specifies the character sequence to use as a line separator between JSON objects.\n
(Unix/Linux newline);\r\n
(Windows newline)
df . write . option(“lineSep”, “\r\n”)encoding
: Specifies the character encoding to use when writing the JSON files.
UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
df . write . option(“encoding”, “UTF-8”)
df2.write
. format("json")
. option("compression", "gzip") \
. option("dateFormat", "yyyy-MM-dd") \
. option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
. option("encoding", "UTF-8") \
. option("lineSep", "\r\n")
. save("dbfs:/FileStore/output_json")
modes
- Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
- Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
- Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
- Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
Read & Write SQL Server
Read from SQL Server
server_name = "mainri-sqldb.database.windows.net"
port=1433
username="my login name"
password="my login password"
database_name="mainri-sqldb"
table_name="dep"
jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
connection_properties = {
"user": username,
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df.show()
+-----+--------------+
|depid| dep|
+-----+--------------+
| 1| IT|
| 2| Sale|
| 3| Finance|
| 4|human resource|
+-----+--------------+
alternative way
df1=spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.load()
Select Specific Columns to Read
In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable
option.
df1=spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("databaseName",database_name) \
.option("query", "select * from [dbo].[dep] where depid>3") \
.option("user", username) \
.option("password", password) \
.load()
df.show()
+-----+--------------+
|depid| dep|
+-----+--------------+
| 4|human resource|
| 5| admin|
+-----+--------------+
write to table
write mode:
Append: mode("append")
to append the rows to the existing SQL Server table.
# we have define variables, here is show again
server_name = “mainri-sqldb.database.windows.net”
port=1433
username=”my login name”
password=”my login password”
database_name=”mainri-sqldb”
table_name=”dep”
jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”
# append the rows to the existing SQL Server table.
df_newrow.show()
+-----+-----+
|depid| dep|
+-----+-----+
| 5|admin|
+-----+-----+
df_newrow.write \
.format("jdbc") \
.mode("append") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
+-----+--------------+
|depid| dep|
+-----+--------------+
| 1| IT|
| 2| Sale|
| 3| Finance|
| 4|human resource|
| 5| admin| <- new added row
+-----+--------------+
overwrite
The mode("overwrite")
drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.
df_newrow.write \
.format("jdbc") \
.mode("overwrite") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
Read & Write MySQL
Read from MySQL
# Read from MySQL Table
val df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "root") \
.load()
Select Specific Columns to Read
# Read from MySQL Table
val df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("query", "select id,age from employee where gender='M'") \
.option("numPartitions",4) \
.option("fetchsize", 20) \
.option("user", "root") \
.option("password", "root") \
.load()
Write to MySQL
Some points to note while writing
- To re-write the existing table, use the
mode("overwrite")
. This drops the table if already exists by default and re-creates a new one without indexes. - To retain the indexes, use
option("truncate","true")
. - By default, the connector uses
READ_COMMITTED
isolation level. To change this useoption("mssqlIsolationLevel", "READ_UNCOMMITTED")
. - The
dbtable
option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
# Write to MySQL Table
sampleDF.write \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "root") \
.save()
Read JDBC in Parallel
PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing.
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions)
before writing.
# Read Table in Parallel
df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("dbtable","employee") \
.option("numPartitions",5) \
.option("user", "root") \
.option("password", "root") \
.load()
Select columns with where clause
# Select columns with where clause
df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("query","select id,age from employee where gender='M'") \
.option("numPartitions",5) \
.option("user", "root") \
.option("password", "root") \
.load()
Using fetchsize with numPartitions to Read
The fetchsize
is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers. Do not set this to very large number as you might see issues.
# Using fetchsize
df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("query","select id,age from employee where gender='M'") \
.option("numPartitions",5) \
.option("fetchsize", 20) \
.option("user", "root") \
.option("password", "root") \
.load()
Read & Write delta table, Catalog, Hive Table
Read delta table, Catalog, Hive Table
# Read Hive table
df = spark.sql("select * from emp.employee")
df.show()
# Read Hive table
df = spark.read.table("employee")
df.show()
# Read delta table
df = spark.sql("select * from delta.` table path ` ")
df.show()
caution: ` Backticks
Write / Save
To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.
# Create Hive Internal table
sampleDF.write.mode('overwrite') \
.saveAsTable("emp.employee")
use SQL, temporary view
# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")
# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")
# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")
# Insert into sampleTable using the sampleView.
spark.sql("INSERT INTO TABLE ct.sampleTable SELECT * FROM sampleView")
# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
(remove all space from the email account 😊)