In PySpark, we can read from and write to CSV files using DataFrameReader
and DataFrameWriter with the csv method. Here’s a guide on how to work with CSV files in PySpark:
Reading CSV Files in PySpark
Syntax
df = spark.read.format(“csv”).options(options).load(ffile_location).schema(schema_df)
format
- csv
- Parquet
- ORC
- JSON
- AVRO
option
- header = “True”; “False”
- inferSchema = “True”; ”False”
- sep=”,” … whatever
file_location
- load(path1)
- load(path1,path2……)
- load(folder)
Schema
- define a schema
- Schema
- my_schema
define a schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
StructField("column1", IntegerType(), True), # Column1 is Integer, nullable
StructField("column2", StringType(), True), # Column2 is String, nullable
StructField("column3", StringType(), False) # Column3 is String, non-nullable
])
#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"
Example
Read CSV file with header, infer schema, and specify null value
# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("header", "true") \ # Use the first row as the header
.option("inferSchema", "true")\ # Automatically infer the schema
.option("sep", ",") \ # Specify the delimiter
.load("path/to/input_file.csv")\ # Load the file
.option("nullValue", "NULL" # Define a string representation of null
# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\
.option("nullValue", "NULL")
# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("/path_folder/)\
.option("nullValue", "NULL")
When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.
Writing CSV Files in PySpark
Syntax
df.write.format("csv").options(options).save("path/to/output_directory")
Example
# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("path/to/output_directory")
# Write DataFrame to a CSV file with header, partitioning, and compression
df.write.format("csv") \
.option("header", "true") \ # Write the header
.option("compression", "gzip") \ # Use gzip compression
.partitionBy("year", "month") \ # Partition the output by specified columns
.mode("overwrite") \ # Overwrite existing data
.save("path/to/output_directory") # Specify output directory
By default, If you try to write directly to a file (e.g., name1.csv
), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.
path
dbfs:/FileStore/name1.csv/_SUCCESS
dbfs:/FileStore/name1.csv/_committed_7114979947112568022
dbfs:/FileStore/name1.csv/_started_7114979947112568022
dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv
PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1)
or repartition(1)
operation before writing, which reduces the number of partitions to one.
df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")
at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv
dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv
we need take additional step – “Rename the File to name1.csv
“
# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
# Filter for the part file
for file in files:
if file.name.startswith("part-"):
source_file = file.path # Full path to the part file
destination_file = "dbfs:/FileStore/name1.csv"
# Move and rename the file
dbutils.fs.mv(source_file, destination_file)
break
display(dbutils.fs.ls ( "dbfs:/FileStore/"))

Direct Write with Custom File Name
PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:
Steps:
- Use
coalesce(1)
to combine all data into a single partition. - Save the file to a temporary location.
- Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("dbfs:/FileStore/temp_folder")
# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
if file.name.startswith("part-"):
part_file = file.path
break
# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Please do not hesitate to contact me if you have any questions at William . chen @mainri.ca
(remove all space from the email account 😊)