Pyspark: read and write a parquet file

Reading Parquet Files

Syntax

help(spark.read.parquet)


df = spark.read \
    .format("parquet") \
    .option("mergeSchema", "true") \  # Merges schemas of all files (useful when reading from multiple files with different schemas)
    .option("pathGlobFilter", "*.parquet") \  # Read only specific files based on file name patterns
    .option("recursiveFileLookup", "true") \  # Recursively read files from directories and subdirectories
.load("/path/to/parquet/file/or/directory")

Options

  • mergeSchema: When reading Parquet files with different schemas, merge them into a single schema.
    • true (default: false)
  • pathGlobFilter: Allows specifying a file pattern to filter which files to read (e.g., “*.parquet”).
  • recursiveFileLookup: Reads files recursively from subdirectories.
    • true (default: false)
  • modifiedBefore/modifiedAfter: Filter files by modification time. For example:
    .option(“modifiedBefore”, “2023-10-01T12:00:00”)
    .option(“modifiedAfter”, “2023-01-01T12:00:00”)
  • maxFilesPerTrigger: Limits the number of files processed in a single trigger, useful for streaming jobs.
  • schema: Provides the schema of the Parquet file (useful when reading files without inferring schema).

from pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([StructField("id", IntegerType(), True),  StructField("name", StringType(), True)]) 

df = spark.read.schema(schema).parquet("/path/to/parquet")

Path
  • Load All Files in a Directory
    df = spark.read.parquet(“/path/to/directory/”)
  • Load Multiple Files Using Comma-Separated Paths
    df = spark.read.parquet(“/path/to/file1.parquet”, “/path/to/file2.parquet”, “/path/to/file3.parquet”)
  • Using Wildcards (Glob Patterns)
    df = spark.read.parquet(“/path/to/directory/*.parquet”)
  • Using Recursive Lookup for Nested Directories
    df = spark.read.option(“recursiveFileLookup”, “true”).parquet(“/path/to/top/directory”)
  • Load Multiple Parquet Files Based on Conditions
    df = spark.read .option(“modifiedAfter”, “2023-01-01T00:00:00”) .parquet(“/path/to/directory/”)
  • Programmatically Load Multiple Files
    file_paths = [“/path/to/file1.parquet”, “/path/to/file2.parquet”, “/path/to/file3.parquet”]
    df = spark.read.parquet(*file_paths)
  • Load Files from External Storage (e.g., S3, ADLS, etc.)
    df = spark.read.parquet(“s3a://bucket-name/path/to/files/”)

Example


# Reading Parquet files with options
df = spark.read \
    .format("parquet") \
    .option("mergeSchema", "true") \
    .option("recursiveFileLookup", "true") \
    .load("/path/to/parquet/files")

Conclusion

To load multiple Parquet files at once, you can:

  • Load an entire directory.
  • Use wildcard patterns to match multiple files.
  • Recursively load from subdirectories.
  • Programmatically pass a list of file paths. These options help streamline your data ingestion process when dealing with multiple Parquet files in Databricks.

Write to parquet

Syntax


# Writing a Parquet file
df.write \
    .format("parquet") \
    .mode("overwrite") \  # Options: "overwrite", "append", "ignore", "error"
    .option("compression", "snappy") \  # Compression options: none, snappy, gzip, lzo, brotli, etc.
    .option("maxRecordsPerFile", 100000) \  # Max number of records per file
    .option("path", "/path/to/output/directory") \
    .partitionBy("year", "month") \  # Partition the output by specific columns
.save()

Options

compression: .option(“compression”, “snappy”)

Specifies the compression codec to use when writing files.
Options: none, snappy (default), gzip, lzo, brotli, lz4, zstd, etc.

maxRecordsPerFile: .option(“maxRecordsPerFile”, 100000)

Controls the number of records per file when writing.
Default: None (no limit).

saveAsTable: saveAsTable(“parquet_table”)

Saves the DataFrame as a table in the catalog.

Save: save()
path:

Defines the output directory or file path.

mode: mode(“overwrite”)

Specifies the behavior if the output path already exists.

  • overwrite: Overwrites existing data.
  • append: Appends to existing data.
  • ignore: Ignores the write operation if data already exists.
  • error or errorifexists: Throws an error if data already exists (default).
Partition: partitionBy(“year”, “month”)

Partitions the output by specified columns

bucketBy: .bucketBy(10, “id”)

Distributes the data into a fixed number of buckets

df.write \
    .bucketBy(10, "id") \
    .sortBy("name") \
.saveAsTable("parquet_table")

Example


# Writing Parquet files with options
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "gzip") \
    .option("maxRecordsPerFile", 50000) \
    .partitionBy("year", "month") \
    .save("/path/to/output/directory")

writing key considerations:

  • Use mergeSchema if the Parquet files have different schemas, but it may increase overhead.
  • Compression can significantly reduce file size, but it can add some processing time during read and write operations.
  • Partitioning by columns is useful for organizing large datasets and improving query performance.

Pyspark: read, write and flattening complex nested json

Reading JSON Files

Syntax

df = spark.read.option(options).schema(schame).json(“/path/to/json/file”)

options

  • multiline: option (“multiline”, “true”)
     If your JSON files contain multiple lines for a single record, you need to enable multiline.
  • Mode: option (“mode”, “FAILFAST”)
    Determines the behavior when the input file contains corrupt records. Available options:
    PERMISSIVE (default): Tries to parse all records and puts corrupt records in a new column _corrupt_record.
    DROPMALFORMED: Drops the corrupted records.
    FAILFAST: Fails when corrupt records are encountered.
  • primitivesAsString: option (“primitivesAsString”, “true”)
    Treats primitives (like int, float, etc.) as strings.
  • allowUnquotedFieldNames: option (“allowUnquotedFieldNames”, “true”)
    Allows reading JSON files with unquoted field names.
  • allowSingleQuotes: (“allowSingleQuotes”, “true”)
    Allows single quotes for field names and values.
  • timestampFormat: option(“timestampFormat”, “yyyy-MM-dd’T’HH:mm:ss”)
    Sets the format for timestamp fields.

Schema


from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

Example


df = spark.read.option("multiline", "true") \
               .option("mode", "PERMISSIVE") \
               .schema(schema) \
               .json("/path/to/input/json")


Writing JSON Files

Syntax

df.write. option (options). mode(“overwrite”).json(“/path/to/output/json”)

  • mode: mode(“overwrite”)
    Specifies how to handle existing data. Available options:
    ·  overwrite: Overwrites existing data.
    ·  append: Appends to existing data.
    ·  ignore: Ignores write operation if the file already exists.
    ·  error (default): Throws an error if the file exists
  • compression: option(“compression”, “gzip”)
    Specifies compression for the output file. Available options include gzip, bzip2, none (default).
  • dateFormat: option(“dateFormat”, “yyyy-MM-dd”)
    Sets the format for date fields during writing.
  • timestampFormat: option(“timestampFormat”, “yyyy-MM-dd’T’HH:mm:ss”)
    Sets the format for timestamp fields during writing.
  • ignoreNullFields: option(“ignoreNullFields”, “true”)
    Ignores null fields when writing JSON.
  • lineSep: option(“lineSep”, “\r\n”)
    Custom line separator (default is \n).

Example


df.write.mode("overwrite") \
        .option("compression", "gzip") \
        .option("dateFormat", "yyyy-MM-dd") \
        .json("/path/to/output/json")

Flattening the Nested JSON

Sample Complex JSON

This JSON includes nested objects and arrays. The goal is to flatten the nested structures.

{
  "name": "John",
  "age": 30,
  "address": {
    "street": "123 Main St",
    "city": "New York"
  },
  "contact": {
    "phone": "123-456-7890",
    "email": "john@example.com"
  },
  "orders": [
    {
      "id": 1,
      "item": "Laptop",
      "price": 999.99
    },
    {
      "id": 2,
      "item": "Mouse",
      "price": 49.99
    }
  ]
}

#Reading the Complex JSON
df = spark.read.option(“multiline”, “true”).json(“/path/to/complex.json”)

Step 1: Flattening Nested Objects

Flattening the Nested JSON, use PySpark’s select and explode functions to flatten the structure.


from pyspark.sql.functions import col

df_flattened = df.select(
    col("name"),
    col("age"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("contact.phone").alias("phone"),
    col("contact.email").alias("email")
)
df_flattened.show(truncate=False)

This will flatten the address and contact fields.

Step 2: Flattening Arrays with explode

For fields that contain arrays (like orders), you can use explode to flatten the array into individual rows.


from pyspark.sql.functions import explode

df_flattened_orders = df.select(
    col("name"),
    col("age"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("contact.phone").alias("phone"),
    col("contact.email").alias("email"),
    explode(col("orders")).alias("order")
)

# Now flatten the fields inside the "order" structure
df_final = df_flattened_orders.select(
    col("name"),
    col("age"),
    col("street"),
    col("city"),
    col("phone"),
    col("email"),
    col("order.id").alias("order_id"),
    col("order.item").alias("order_item"),
    col("order.price").alias("order_price")
)

df_final.show(truncate=False)

Output

nameagestreetcityphoneemailorder_idorder_itemorder_price
John30123 Main StNew York123-456-7890john@example.com1Laptop999.99
John30123 Main StNew York123-456-7890john@example.com2Mouse49.99

Key Functions Used:

  • col(): Accesses columns of the DataFrame.
  • alias(): Renames a column.
  • explode(): Converts an array into multiple rows, one for each element in the array.

Generalize for Deeper Nested Structures

For deeply nested JSON structures, you can apply this process recursively by continuing to use select, alias, and explode to flatten additional layers.