Pyspark: read and write a csv file

In PySpark, we can read from and write to CSV files using DataFrameReader and DataFrameWriter with the csv method. Here’s a guide on how to work with CSV files in PySpark:

Reading CSV Files in PySpark

Syntax

df = spark.read.format(“csv”).options(options).load(ffile_location).schema(schema_df)

format
  • csv
  • Parquet
  • ORC
  • JSON
  • AVRO
option
  • header = “True”; “False”
  • inferSchema = “True”; ”False”
  • sep=”,” … whatever
file_location
  • load(path1)
  • load(path1,path2……)
  • load(folder)
Schema
  • define a schema
  • Schema
  • my_schema

define a schema


from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
    StructField("column1", IntegerType(), True),   # Column1 is Integer, nullable
    StructField("column2", StringType(), True),    # Column2 is String, nullable
    StructField("column3", StringType(), False)    # Column3 is String, non-nullable
])

#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"

Example

Read CSV file with header, infer schema, and specify null value


# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
    .option("header", "true") \    # Use the first row as the header
    .option("inferSchema", "true")\ # Automatically infer the schema
    .option("sep", ",") \           # Specify the delimiter
    .load("path/to/input_file.csv")\ # Load the file
    .option("nullValue", "NULL" # Define a string representation of null


# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\   
.option("nullValue", "NULL")


# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("/path_folder/)\   
.option("nullValue", "NULL")

When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.

Writing CSV Files in PySpark

Syntax


df.write.format("csv").options(options).save("path/to/output_directory")

Example


# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("path/to/output_directory")



# Write DataFrame to a CSV file with header, partitioning, and compression

df.write.format("csv") \
  .option("header", "true") \         # Write the header
  .option("compression", "gzip") \    # Use gzip compression
  .partitionBy("year", "month") \ # Partition the output by specified columns
  .mode("overwrite") \                # Overwrite existing data
  .save("path/to/output_directory")   # Specify output directory