In PySpark, we can read from and write to CSV files using DataFrameReader
and DataFrameWriter with the csv method. Here’s a guide on how to work with CSV files in PySpark:
Reading CSV Files in PySpark
Syntax
df = spark.read.format(“csv”).options(options).load(ffile_location).schema(schema_df)
format
- csv
- Parquet
- ORC
- JSON
- AVRO
option
- header = “True”; “False”
- inferSchema = “True”; ”False”
- sep=”,” … whatever
file_location
- load(path1)
- load(path1,path2……)
- load(folder)
Schema
- define a schema
- Schema
- my_schema
define a schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
StructField("column1", IntegerType(), True), # Column1 is Integer, nullable
StructField("column2", StringType(), True), # Column2 is String, nullable
StructField("column3", StringType(), False) # Column3 is String, non-nullable
])
#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"
Example
Read CSV file with header, infer schema, and specify null value
# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("header", "true") \ # Use the first row as the header
.option("inferSchema", "true")\ # Automatically infer the schema
.option("sep", ",") \ # Specify the delimiter
.load("path/to/input_file.csv")\ # Load the file
.option("nullValue", "NULL" # Define a string representation of null
# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\
.option("nullValue", "NULL")
# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \
.option("inferSchema", "true")\
.option("sep", ",") \
.load("/path_folder/)\
.option("nullValue", "NULL")
When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.
Writing CSV Files in PySpark
Syntax
df.write.format("csv").options(options).save("path/to/output_directory")
Example
# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("path/to/output_directory")
# Write DataFrame to a CSV file with header, partitioning, and compression
df.write.format("csv") \
.option("header", "true") \ # Write the header
.option("compression", "gzip") \ # Use gzip compression
.partitionBy("year", "month") \ # Partition the output by specified columns
.mode("overwrite") \ # Overwrite existing data
.save("path/to/output_directory") # Specify output directory