Reading JSON Files
Syntax
df = spark.read.option(options).schema(schame).json(“/path/to/json/file”)
options
- multiline: option (“multiline”, “true”)
If your JSON files contain multiple lines for a single record, you need to enable multiline. - Mode: option (“mode”, “FAILFAST”)
Determines the behavior when the input file contains corrupt records. Available options:
PERMISSIVE (default): Tries to parse all records and puts corrupt records in a new column _corrupt_record.
DROPMALFORMED: Drops the corrupted records.
FAILFAST: Fails when corrupt records are encountered. - primitivesAsString: option (“primitivesAsString”, “true”)
Treats primitives (like int, float, etc.) as strings. - allowUnquotedFieldNames: option (“allowUnquotedFieldNames”, “true”)
Allows reading JSON files with unquoted field names. - allowSingleQuotes: (“allowSingleQuotes”, “true”)
Allows single quotes for field names and values. - timestampFormat: option(“timestampFormat”, “yyyy-MM-dd’T’HH:mm:ss”)
Sets the format for timestamp fields.
Schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
Example
df = spark.read.option("multiline", "true") \
.option("mode", "PERMISSIVE") \
.schema(schema) \
.json("/path/to/input/json")
Writing JSON Files
Syntax
df.write. option (options). mode(“overwrite”).json(“/path/to/output/json”)
- mode: mode(“overwrite”)
Specifies how to handle existing data. Available options:
· overwrite: Overwrites existing data.
· append: Appends to existing data.
· ignore: Ignores write operation if the file already exists.
· error (default): Throws an error if the file exists - compression: option(“compression”, “gzip”)
Specifies compression for the output file. Available options include gzip, bzip2, none (default). - dateFormat: option(“dateFormat”, “yyyy-MM-dd”)
Sets the format for date fields during writing. - timestampFormat: option(“timestampFormat”, “yyyy-MM-dd’T’HH:mm:ss”)
Sets the format for timestamp fields during writing. - ignoreNullFields: option(“ignoreNullFields”, “true”)
Ignores null fields when writing JSON. - lineSep: option(“lineSep”, “\r\n”)
Custom line separator (default is \n).
Example
df.write.mode("overwrite") \
.option("compression", "gzip") \
.option("dateFormat", "yyyy-MM-dd") \
.json("/path/to/output/json")
Flattening the Nested JSON
Sample Complex JSON
This JSON includes nested objects and arrays. The goal is to flatten the nested structures.
{
"name": "John",
"age": 30,
"address": {
"street": "123 Main St",
"city": "New York"
},
"contact": {
"phone": "123-456-7890",
"email": "john@example.com"
},
"orders": [
{
"id": 1,
"item": "Laptop",
"price": 999.99
},
{
"id": 2,
"item": "Mouse",
"price": 49.99
}
]
}
#Reading the Complex JSON
df = spark.read.option(“multiline”, “true”).json(“/path/to/complex.json”)
Step 1: Flattening Nested Objects
Flattening the Nested JSON, use PySpark’s select and explode functions to flatten the structure.
from pyspark.sql.functions import col
df_flattened = df.select(
col("name"),
col("age"),
col("address.street").alias("street"),
col("address.city").alias("city"),
col("contact.phone").alias("phone"),
col("contact.email").alias("email")
)
df_flattened.show(truncate=False)
This will flatten the address and contact fields.
Step 2: Flattening Arrays with explode
For fields that contain arrays (like orders), you can use explode to flatten the array into individual rows.
from pyspark.sql.functions import explode
df_flattened_orders = df.select(
col("name"),
col("age"),
col("address.street").alias("street"),
col("address.city").alias("city"),
col("contact.phone").alias("phone"),
col("contact.email").alias("email"),
explode(col("orders")).alias("order")
)
# Now flatten the fields inside the "order" structure
df_final = df_flattened_orders.select(
col("name"),
col("age"),
col("street"),
col("city"),
col("phone"),
col("email"),
col("order.id").alias("order_id"),
col("order.item").alias("order_item"),
col("order.price").alias("order_price")
)
df_final.show(truncate=False)
Output
name | age | street | city | phone | order_id | order_item | order_price | |
John | 30 | 123 Main St | New York | 123-456-7890 | john@example.com | 1 | Laptop | 999.99 |
John | 30 | 123 Main St | New York | 123-456-7890 | john@example.com | 2 | Mouse | 49.99 |
Key Functions Used:
- col(): Accesses columns of the DataFrame.
- alias(): Renames a column.
- explode(): Converts an array into multiple rows, one for each element in the array.
Generalize for Deeper Nested Structures
For deeply nested JSON structures, you can apply this process recursively by continuing to use select
, alias
, and explode
to flatten additional layers.