In PySpark, we can read from and write to CSV files using DataFrameReader and DataFrameWriter with the csv method. Hereās a guide on how to work with CSV files in PySpark:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
StructField("column1", IntegerType(), True), # Column1 is Integer, nullable
StructField("column2", StringType(), True), # Column2 is String, nullable
StructField("column3", StringType(), False) # Column3 is String, non-nullable
])
#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"
Example
Read CSV file with header, infer schema, and specify null value
# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("header", "true") \ # Use the first row as the header
.option("inferSchema", "true")\ # Automatically infer the schema
.option("sep", ",") \ # Specify the delimiter
.load("path/to/input_file.csv")\ # Load the file
.option("nullValue", "NULL" # Define a string representation of null
# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\
.option("nullValue", "NULL")
# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("/path_folder/)\
.option("nullValue", "NULL")
When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.
# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("path/to/output_directory")
# Write DataFrame to a CSV file with header, partitioning, and compression
df.write.format("csv") \
.option("header", "true") \ # Write the header
.option("compression", "gzip") \ # Use gzip compression
.partitionBy("year", "month") \ # Partition the output by specified columns
.mode("overwrite") \ # Overwrite existing data
.save("path/to/output_directory") # Specify output directory
By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.
PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.
at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv
we need take additional step – “Rename the File to name1.csv“
# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
# Filter for the part file
for file in files:
if file.name.startswith("part-"):
source_file = file.path # Full path to the part file
destination_file = "dbfs:/FileStore/name1.csv"
# Move and rename the file
dbutils.fs.mv(source_file, destination_file)
break
display(dbutils.fs.ls ( "dbfs:/FileStore/"))
Direct Write with Custom File Name
PySpark doesnāt natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:
Steps:
Use coalesce(1) to combine all data into a single partition.
Save the file to a temporary location.
Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("dbfs:/FileStore/temp_folder")
# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
if file.name.startswith("part-"):
part_file = file.path
break
# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
Please do not hesitate to contact me if you have any questions at William . chen @mainri.ca