Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

If a Delta table is saved in Blob Storage or Azure Data Lake Storage (ADLS), you access it using the file path rather than a cataloged name (like in Unity Catalog). Here’s how to read from and write to Delta tables stored in Blob Storage or ADLS in Spark SQL and PySpark.

Reading Delta Tables from Blob Storage or ADLS

To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

Syntax

# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
  

Writing Delta Tables to Blob Storage or ADLS

When writing to Delta tables, use the delta format and specify the path where you want to store the table.

Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks

# PySpark 
df.write.format("delta").mode("overwrite").save("path/to/delta/table")

Options and Parameters for Delta Read/Write

Options for Reading Delta Tables:

You can configure the read operation with options like:

  • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
  • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
  • timeTravel: Enables querying older versions of the Delta table.
df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
df.show()

Options for Writing Delta Tables:

mode: Controls the write mode.

  • overwrite: Overwrites the existing data.
  • append: Adds to existing data.
  • ignore: Ignores the write if data exists.
  • errorifexists: Defaults to throwing an error if data exists.

partitionBy: Partition the data by one or more columns.

overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("column_name") \
    .save("path/to/delta/table")

Time Travel and Versioning with Delta (PySpark)

Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()

# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()

Conclusion:

  • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
  • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
  • For reading, use .format("delta").load("path").
  • For writing, use .write.format("delta").save("path").

Read table from Unity Catalog and write table to Unity Catalog

To read from and write to Unity Catalog in PySpark, you typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name.

Reading from Unity Catalog

To read a table from Unity Catalog, specify the table’s full path:

# Reading a table
df = spark.read.table("catalog.schema.table")
df.show()

# Using Spark SQL
df = spark.sql("SELECT * FROM catalog.schema.table")

Writing to Unity Catalog

To write data to Unity Catalog, you specify the table name in the saveAsTable method:

# Writing a DataFrame to a new table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog.schema.new_table")

Options for Writing to Unity Catalog:

  • format: Set to "delta" for Delta Lake tables, as Unity Catalog uses Delta format.
  • mode: Options include overwrite, append, ignore, and error.

Example: Read, Transform, and Write Back to Unity Catalog

# Read data from a Unity Catalog table
df = spark.read.table("catalog_name.schema_name.source_table")

# Perform transformations
transformed_df = df.filter(df["column_name"] > 10)

# Write transformed data back to a different table
transformed_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_name.schema_name.target_table")

Comparison of Delta, JSON, and CSV Reads/Writes

FormatStorage LocationRead SyntaxWrite SyntaxNotes
DeltaUnity Catalogdf = spark.read.table("catalog.schema.table")df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.table")Unity Catalog natively supports Delta with schema enforcement and versioning.
Blob/ADLSdf = spark.read.format("delta").load("path/to/delta/folder")df.write.format("delta").mode("overwrite").save("path/to/delta/folder")Requires Delta Lake library; supports ACID transactions and time-travel capabilities.
JSONUnity CatalogNot directly supported in Unity Catalog; typically needs to be read as a Delta table or temporary table.Not directly supported; must be converted to Delta format before writing to Unity Catalog.Convert JSON to Delta format to enable integration with Unity Catalog.
Blob/ADLSdf = spark.read.json("path/to/json/files")df.write.mode("overwrite").json("path/to/json/folder")Simple structure, no schema enforcement by default; ideal for semi-structured data.
CSVUnity CatalogNot directly supported; CSV files should be imported as Delta tables or temporary views.Not directly supported; convert to Delta format for compatibility with Unity Catalog.Similar to JSON, requires conversion for use in Unity Catalog.
Blob/ADLSdf = spark.read.option("header", True).csv("path/to/csv/files")df.write.option("header", True).mode("overwrite").csv("path/to/csv/folder")Lacks built-in schema enforcement; additional steps needed for ACID or schema evolution.

Detailed Comparison and Notes:

  1. Unity Catalog
    • Delta: Unity Catalog fully supports Delta format, allowing for schema evolution, ACID transactions, and built-in security and governance.
    • JSON and CSV: To use JSON or CSV in Unity Catalog, convert them into Delta tables or load them as temporary views before making them part of Unity’s governed catalog. This is because Unity Catalog enforces structured data formats with schema definitions.
  2. Blob Storage & ADLS (Azure Data Lake Storage)
    • Delta: Blob Storage and ADLS support Delta tables if the Delta Lake library is enabled. Delta on Blob or ADLS retains most Delta features but may lack some governance capabilities found in Unity Catalog.
    • JSON & CSV: Both Blob and ADLS provide support for JSON and CSV formats, allowing flexibility with semi-structured data. However, they do not inherently support schema enforcement, ACID compliance, or governance features without Delta Lake.
  3. Delta Table Benefits:
    • Schema Evolution and Enforcement: Delta enables schema evolution, essential in big data environments.
    • Time Travel: Delta provides versioning, allowing access to past versions of data.
    • ACID Transactions: Delta ensures consistency and reliability in large-scale data processing.

Join(), union(), unionAll(), unionByName(), fill(), fillna()

join()

The join() method is used to combine two DataFrames based on a common column or multiple columns. This method is extremely versatile, supporting various types of SQL-style joins such as inner, outer, left, and right joins.

Syntax

DataFrame.join(other, on=None, how=None)

Parameters

  • other: The other DataFrame to join with the current DataFrame.
  • on: A string or a list of column names on which to join. This can also be an expression (using col() or expr()).

how: The type of join to perform. It can be one of

  • 'inner': Inner join (default). Returns rows that have matching values in both DataFrames.
  • 'outer' or 'full': Full outer join. Returns all rows from both DataFrames, with null values for missing matches.
  • 'left' or 'left_outer': Left outer join. Returns all rows from the left DataFrame and matched rows from the right DataFrame. Unmatched rows from the right DataFrame result in null values.
  • 'right' or 'right_outer': Right outer join. Returns all rows from the right DataFrame and matched rows from the left DataFrame. Unmatched rows from the left DataFrame result in null values.
  • 'left_semi': Left semi join. Returns only the rows from the left DataFrame where the join condition is satisfied.
  • 'left_anti': Left anti join. Returns only the rows from the left DataFrame where no match is found in the right DataFrame.
  • 'cross': Cross join (Cartesian product). Returns the Cartesian product of both DataFrames, meaning every row from the left DataFrame is combined with every row from the right DataFrame.
sample datasets
df1
+-------+-------+
|   name|dept_id|
+-------+-------+
|  Alice|      1|
|    Bob|      2|
|Charlie|      3|
+-------+-------+
df2
+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|      1|         HR|
|      2|    Finance|
|      4|Engineering|
+-------+-----------+

# Union the two DataFrames
df_union = df1.union(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

# Inner join (default)
df_inner = df1.join(df2, on="dept_id")

==output==
+-------+-----+---------+
|dept_id| name|dept_name|
+-------+-----+---------+
|      1|Alice|       HR|
|      2|  Bob|  Finance|
+-------+-----+---------+

Others are the same as SQL. new, let’s focus on Semi and Anti Joins

  • Left Semi Join: Only returns rows from the left DataFrame that have matches in the right DataFrame.
  • Left Anti Join: Only returns rows from the left DataFrame that don’t have matches in the right DataFrame.
# Left semi join
df_left_semi = df1.join(df2, on="dept_id", how="left_semi")
df_left_semi.show()

==output==
+-------+-----+
|dept_id| name|
+-------+-----+
|      1|Alice|
|      2|  Bob|
+-------+-----+
Charlie's dep_Id=3. it does not appear in df2, so skipped it.


# Left anti join
df_left_anti = df1.join(df2, on="dept_id", how="left_anti")
df_left_anti.show()

==output==
+-------+-------+
|dept_id|   name|
+-------+-------+
|      3|Charlie|
+-------+-------+
Only Charlie, dep_Id=3, does not appear in df2, so return it only.

union ()

The union() method is used to combine two DataFrames with the same schema (i.e., same number and type of columns). This operation concatenates the rows of the two DataFrames, similar to a SQL UNION operation, but without removing duplicate rows (like UNION ALL in SQL).

Syntax

DataFrame.union(other)

Key Points

  • Schema Compatibility: Both DataFrames must have the same number of columns, and the data types of the corresponding columns must match.
  • Union Behavior: Unlike SQL’s UNION which removes duplicates, union() in PySpark keeps all rows, including duplicates. This is equivalent to SQL’s UNION ALL.
  • Order of Rows: The rows from the first DataFrame will appear first, followed by the rows from the second DataFrame.
  • Column Names and Data Types Must Match: The column names don’t need to be identical in both DataFrames, but their positions and data types must match. If the number of columns or the data types don’t align, an error will be raised.
  • Union with Different Column Names: Even though column names don’t need to be the same, the columns are merged by position, not by name. If you attempt to union() DataFrames with different column orders, the results could be misleading. Therefore, it’s important to make sure the schemas match.
sample datasets
df1
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+

df2:
+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
|  David| 40|
+-------+---+

# Union the two DataFrames
df_union = df1.union(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

unionByName ()

The unionByName() method in PySpark is similar to the union() method but with a key distinction: it merges two DataFrames by aligning columns based on column names, rather than their positions.

Syntax

DataFrame.unionByName(other, allowMissingColumns=False)

Parameters

  • other: The other DataFrame to be unioned with the current DataFrame.
  • allowMissingColumns: A boolean flag (False by default). If True, this allows the union of DataFrames even if one DataFrame has columns that are missing in the other. The missing columns in the other DataFrame will be filled with null values.

Key Points

  • Column Name Alignment: The method aligns columns by name, not by their position, which makes it flexible for combining DataFrames that have different column orders.
  • Handling Missing Columns: By default, if one DataFrame has columns that are missing in the other, PySpark will throw an error. However, setting allowMissingColumns=True allows unioning in such cases, and missing columns in one DataFrame will be filled with null values in the other.
  • Duplicate Rows: Just like union(), unionByName() does not remove duplicates, and the result includes all rows from both DataFrames.
sample datasets
df1
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+

df2:
+-------+----+
|age| name|
+---+--------+
| 35| Charlie|
| 40| David  |
+---+--------+

# Union the two DataFrames
df_union = df1.unionByName(df2)

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

Handling Missing Columns with allowMissingColumns=True

sample df
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+

+---+-------+-------+
|age|   name|  title|
+---+-------+-------+
| 35|Charlie|Manager|
+---+-------+-------+

# Union by name with missing columns allowed
# Alice does not has "title"  
df_union_missing_columns = df1.unionByName(df2, allowMissingColumns=True)

==output==
+-------+---+-------+
|   name|age|  title|
+-------+---+-------+
|  Alice| 25|   null|
|Charlie| 35|Manager|
+-------+---+-------+

Multiple Columns and Different Schemas

Sample Df
+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|Alice| 25|New York|
+-----+---+--------+

+---+----+
|age|name|
+---+----+
| 30| Bob|
+---+----+

# Union by name with missing columns allowed
df_union = df1.unionByName(df2, allowMissingColumns=True)

==output==
+-----+---+--------+
| name|age|    city|
+-----+---+--------+
|Alice| 25|New York|
|  Bob| 30|    null|
+-----+---+--------+

unionAll ()

unionAll() was an older method used to combine two DataFrames without removing duplicates. However, starting from PySpark 2.0, unionAll() has been deprecated and replaced by union(). The behavior of unionAll() is now identical to that of union() in PySpark.

look at union () in detail.


fillna (), df.na.fill()

fillna() is a method used to replace null (or missing) values in a DataFrame with a specified value. Return new DataFrame with null values replaced by the specified value.

Syntax

DataFrame.fillna(value, subset=None)

df.na.fill(value, subset=None)

df.na.fill(value, subset=None) has the result of df.fillna().

Parameters

  • value: The value to replace null with. It can be a scalar value (applied across all columns) or a dictionary (to specify column-wise replacement values). The type of value should match the type of the column you are applying it to (e.g., integers for integer columns, strings for string columns).
  • subset: Specifies the list of column names where the null values will be replaced.
    If not provided, the replacement is applied to all columns.
sample df
+-------+----+----+
|   name| age|dept|
+-------+----+----+
|  Alice|  25|null|
|    Bob|null|  HR|
|Charlie|  30|null|
+-------+----+----+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)

"age" is long, "dept" is string

# Fill null values with default values for all columns
df_filled = df.fillna(0)

==output==
+-------+---+----+
|   name|age|dept|
+-------+---+----+
|  Alice| 25|null|
|    Bob|  0|  HR|
|Charlie| 30|null|
+-------+---+----+
Bob's age is filled with 0, since it is "long", and "Dept" column did not fill, still remains "null".

Handle different Columns with different data type

# Fill nulls in the 'age' column with 0 and in the 'dept' column with "Unknown"
df_filled_columns = df.fillna({"age": 0, "dept": "Unknown"})

df_filled_columns.show()

==output==
+-------+---+-------+
|   name|age|   dept|
+-------+---+-------+
|  Alice| 25|Unknown|
|    Bob|  0|     HR|
|Charlie| 30|Unknown|

Fill Nulls in Specific Subset of Columns

# Fill null values only in the 'age' column
df_filled_age = df.fillna(0, subset=["age"])

df_filled_age.show()
+-------+---+----+
|   name|age|dept|
+-------+---+----+
|  Alice| 25|null|
|    Bob|  0|  HR|
|Charlie| 30|null|
+-------+---+----+


select()

select() is used to project a subset of columns from a DataFrame or to create new columns based on expressions. It returns a new DataFrame containing only the selected columns or expressions.

Syntax

DataFrame.select(*cols)
df.select(“id”, “name”)
df.select(df.id, df.name)
df.select(df[“id”], df[“name”])
df.select(“name”, col(“age”) + 5)

sample dataframe
+-------+---+---------+
|   name|age|     dept|
+-------+---+---------+
|  Alice| 25|       HR|
|    Bob| 30|  Finance|
|Charlie| 35|Marketing|
+-------+---+---------+
# Select specific columns (name and age)
df_selected = df.select("name", "age")
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+


# Select and transform columns, a new column added.
from pyspark.sql.functions import col

df_transformed = df.select("name", df.age,col("age") + 5)
+-------+---+---------+
|   name|age|(age + 5)|
+-------+---+---------+
|  Alice| 25|       30|
|    Bob| 30|       35|
|Charlie| 35|       40|
+-------+---+---------+

# Using Expressions Inside select()
from pyspark.sql.functions import expr

# Select columns using expressions
df_expr = df.select("name", expr("age + 10").alias("age_plus_10"))
+-------+-----------+
|   name|age_plus_10|
+-------+-----------+
|  Alice|         35|
|    Bob|         40|
|Charlie|         45|
+-------+-----------+

# Select All Columns
df_all_columns = df.select("*")
+-------+---+---------+
|   name|age|     dept|
+-------+---+---------+
|  Alice| 25|       HR|
|    Bob| 30|  Finance|
|Charlie| 35|Marketing|
+-------+---+---------+

distinct(), dropDuplicates(), orderBy(), sort(), groupBy(), agg()

distinct()

distinct () is used to remove duplicate rows from a DataFrame or RDD, leaving only unique rows. It returns a new DataFrame that contains only unique rows from the original DataFrame.

Syntax:

DataFrame.distinct()

Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|  Alice| 25|
|Charlie| 35|
+-------+---+
# Apply distinct() method
distinct_df = df.distinct()

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+
# Selecting Distinct Values for Specific Columns

#sample DF
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 35|
|  1|Alice| 29|
|  3|Cathy| 29|
+---+-----+---+
distinct_columns = df.select("name", "age").distinct()
distinct_columns.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 35|
|  3|Cathy| 29|
+---+-----+---+

dropDuplicates ()

dropDuplicates () is used to remove duplicate rows from a DataFrame based on one or more specific columns.

Syntax:

DataFrame.dropDuplicates([col1, col2, …, coln])

Parameters

cols (optional): This is a list of column names based on which you want to drop duplicates. If no column names are provided, dropDuplicates() will behave similarly to distinct(), removing duplicates across all columns.

Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|  Alice| 25|
|Charlie| 35|
+-------+---+
# Drop duplicates across all columns (similar to distinct)
df_no_duplicates = df.dropDuplicates()

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

# Drop duplicates based on the "name" column
df_unique_names = df.dropDuplicates(["name"])

==output==
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

In the second case, only the first occurrence of each name is kept, while duplicates are removed, regardless of other columns.


orderBy(), sort ()

orderBy() or sort () method is used to sort the rows of a DataFrame based on one or more columns in ascending or descending order. It is equivalent to the SQL ORDER BY clause. The method returns a new DataFrame that is sorted based on the provided column(s).

In PySpark, both orderBy() and sort() methods are available, and they are essentially aliases of each other, with no functional difference. You can use either based on preference:

Syntax:

DataFrame.orderBy(*cols, **kwargs)
DataFrame.sort(*cols, **kwargs)

Parameters

  • cols: Column(s) or expressions to sort by.
    This can be Column names as strings.
  • PySpark Column objects with the sorting direction (asc/desc).
Sample dataframe
+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 20|
|  David| 35|
+-------+---+
# Sort by 'age' column in ascending order (default)
df_sorted = df.orderBy("age")
df_ordered = df.sort("age")

==output==
+-------+---+
|   name|age|
+-------+---+
|Charlie| 20|
|  Alice| 25|
|    Bob| 30|
|  David| 35|
+-------+---+

# Sorting by multiple columns
original dataset
+-------+------+---+
|   name|gender|age|
+-------+------+---+
|  Alice|Female| 25|
|    Bob|  Male| 30|
|  Alice|Female| 22|
|Charlie|  Male| 20|
+-------+------+---+

# Method 1: Using list of column names
df_sorted1 = df.orderBy(["name", "age"], ascending=[True, False])

# Method 2: Using asc() and desc()
df_sorted2 = df.orderBy(asc("name"), desc("age"))

# Method 3: mix
df_sorted3  = df.orderBy(df["name"], df.age, ascending=[True, False]).show()
==output==
+-------+------+---+
|   name|gender|age|
+-------+------+---+
|  Alice|Female| 25|
|  Alice|Female| 22|
|    Bob|  Male| 30|
|Charlie|  Male| 20|
+-------+------+---+

groupBy ()

groupBy() is a method used to group rows in a DataFrame based on one or more columns, similar to SQL’s GROUP BY clause.

Return Type:

It returns a GroupedData object, on which you can apply aggregate functions (agg(), count(), sum(), etc.) to perform computations on the grouped data.

Syntax:

DataFrame.groupBy(*cols)

Parameters

cols: One or more column names or expressions to group by.

Sample dataframe
+-------+----------+
|   name|department|
+-------+----------+
|  Alice|     Sales|
|    Bob|     Sales|
|Charlie|        HR|
|  David|        HR|
|    Eve|     Sales|
+-------+----------+
# Group by department and count the number of employees in each department
df_grouped = df.groupBy("department").count()

==output==
+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|        HR|    2|
+----------+-----+

# Group by multiple columns
original dataset
+-------+----------+------+
|   name|department|gender|
+-------+----------+------+
|  Alice|     Sales|Female|
|    Bob|     Sales|  Male|
|Charlie|        HR|  Male|
|  David|        HR|  Male|
|    Eve|     Sales|Female|
+-------+----------+------+

# Group by department and gender, then count the number of employees in each group
df_grouped_multi = df.groupBy("department", "gender").count()

+----------+------+-----+
|department|gender|count|
+----------+------+-----+
|     Sales|Female|    2|
|     Sales|  Male|    1|
|        HR|  Male|    2|
+----------+------+-----+

agg ()

agg() function in PySpark is used for performing aggregate operations on a DataFrame, such as computing sums, averages, counts, and other aggregations. it is often used in combination with groupBy() to perform group-wise aggregations.

Syntax:

from pyspark.sql.functions import sum, avg, count, max, min
DataFrame.agg(*exprs)

Parameters

  • sum(column): Sum of values in the column.
  • avg(column): Average of values in the column.
  • count(column): Number of rows or distinct values.
  • max(column): Maximum value in the column.
  • min(column): Minimum value in the column.
Sample dataframe
+-------+---+------+
|   name|age|salary|
+-------+---+------+
|  Alice| 25|  5000|
|    Bob| 30|  6000|
|Charlie| 20|  4000|
+-------+---+------+
# Apply aggregate functions on the DataFrame
df_agg = df.agg(sum("salary").alias("total_salary"), avg("age").alias("avg_age"))

==output==
+------------+-------+
|total_salary|avg_age|
+------------+-------+
|       15000|   25.0|
+------------+-------+

Aggregating with groupBy()

sample data
+-------+----------+------+
|   name|department|salary|
+-------+----------+------+
|  Alice|     Sales|  5000|
|    Bob|     Sales|  6000|
|Charlie|        HR|  4000|
|  David|        HR|  4500|
+-------+----------+------+

# Group by department and aggregate the sum and average of salaries
df_grouped_agg = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary"),
    count("name").alias("num_employees")
)

+----------+------------+----------+-------------+
|department|total_salary|avg_salary|num_employees|
+----------+------------+----------+-------------+
|     Sales|       11000|    5500.0|            2|
|        HR|        8500|    4250.0|            2|
+----------+------------+----------+-------------+

Aggregating multiple columns with different functions

from pyspark.sql.functions import sum, count, avg, max

df_grouped_multi_agg = df.groupBy("department").agg(
    sum("salary").alias("total_salary"),
    count("name").alias("num_employees"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
)

+----------+------------+-------------+----------+----------+
|department|total_salary|num_employees|avg_salary|max_salary|
+----------+------------+-------------+----------+----------+
|     Sales|       11000|            2|    5500.0|      6000|
|        HR|        8500|            2|    4250.0|      4500|
+----------+------------+-------------+----------+----------+

Pyspark: read and write a csv file

In PySpark, we can read from and write to CSV files using DataFrameReader and DataFrameWriter with the csv method. Here’s a guide on how to work with CSV files in PySpark:

Reading CSV Files in PySpark

Syntax

df = spark.read.format(“csv”).options(options).load(ffile_location).schema(schema_df)

format
  • csv
  • Parquet
  • ORC
  • JSON
  • AVRO
option
  • header = “True”; “False”
  • inferSchema = “True”; ”False”
  • sep=”,” … whatever
file_location
  • load(path1)
  • load(path1,path2……)
  • load(folder)
Schema
  • define a schema
  • Schema
  • my_schema

define a schema


from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
    StructField("column1", IntegerType(), True),   # Column1 is Integer, nullable
    StructField("column2", StringType(), True),    # Column2 is String, nullable
    StructField("column3", StringType(), False)    # Column3 is String, non-nullable
])

#or simple format
schema="col1 INTEGER, col2 STRING, col3 STRING, col4 INTEGER"

Example

Read CSV file with header, infer schema, and specify null value


# Read a CSV file with header, infer schema, and specify null value
df = spark.read.format("csv") \
    .option("header", "true") \    # Use the first row as the header
    .option("inferSchema", "true")\ # Automatically infer the schema
    .option("sep", ",") \           # Specify the delimiter
    .load("path/to/input_file.csv")\ # Load the file
    .option("nullValue", "NULL" # Define a string representation of null


# Read multiple CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("path/file1.csv", "path/file2.csv", "path/file3.csv")\   
.option("nullValue", "NULL")


# Read folder all CSV files with header, infer schema, and specify null value
df = spark.read.format("csv") \ 
.option("inferSchema", "true")\     
.option("sep", ",") \             
.load("/path_folder/)\   
.option("nullValue", "NULL")

When you want to read multiple files into a single Dataframe, if schemas are different, load files into Separate DataFrames, then take additional process to merge them together.

Writing CSV Files in PySpark

Syntax


df.write.format("csv").options(options).save("path/to/output_directory")

Example


# Write the result DataFrame to a new CSV file
result_df.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("path/to/output_directory")



# Write DataFrame to a CSV file with header, partitioning, and compression

df.write.format("csv") \
  .option("header", "true") \         # Write the header
  .option("compression", "gzip") \    # Use gzip compression
  .partitionBy("year", "month") \ # Partition the output by specified columns
  .mode("overwrite") \                # Overwrite existing data
  .save("path/to/output_directory")   # Specify output directory

spark: RDD, Dataframe, Dataset, Transformation and Action

In Apache Spark, RDD, DataFrame, and Dataset are the core data structures used to handle distributed data. Each of these offers different levels of abstraction and optimization.

Basic Overview

FeatureRDDDataFrameDataset
DefinitionLow-level abstraction for distributed data (objects).
RDDs (Resilient Distributed Datasets) are the fundamental data structure in Spark, representing an immutable distributed collection of objects. They provide a low-level interface to Spark, allowing developers to perform operations directly on data in a functional programming style.
High-level abstraction for structured data (table).
DataFrames are similar to tables in relational databases and are used to represent structured data. They allow you to perform complex queries and operations on structured datasets with a SQL-like syntax.
Combines RDD and DataFrame, adds type safety (Scala/Java).
Datasets are an extension of DataFrames that provide the benefits of both RDDs and DataFrames, with compile-time type safety. They are particularly useful in Scala and Java for ensuring data types during compile time.
APIFunctional (map, filter, etc.)
The RDD API provides functional programming constructs, allowing transformations and actions through functions like map, filter, and reduce.
SQL-like API + relational functions.
The DataFrame API includes both SQL-like queries and relational functions, making it more user-friendly and easier to work with structured data.
Typed API + relational functions (strong typing in Scala/Java).
The Dataset API combines typed operations with the ability to perform relational functions, allowing for a more expressive and type-safe programming model in Scala/Java.
Data StructureCollection of elements (e.g., objects, arrays).
RDDs are essentially collections of objects, which can be of any type (primitive or complex). This means that users can work with various data types without a predefined schema.
Distributed table with named columns.
DataFrames represent structured data as a distributed table, where each column has a name and a type. This structured format makes it easier to work with large datasets and perform operations.
Typed distributed table with named columns.
Datasets also represent data in a structured format, but they enforce types at compile time, enhancing safety and performance, especially in statically typed languages like Scala and Java.
SchemaNo schemaDefined schema (column names)Schema with compile-time type checking (Scala/Java)
PerformanceLess optimized (no Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)

Transformations and Actions

Transformations and Actions are two fundamental operations used to manipulate distributed data collections like RDDs, DataFrames, and Datasets.

High-Level Differences

  • Transformations: These are lazy operations that define a new RDD, DataFrame, or Dataset by applying a function to an existing one. However, they do not immediately execute—Spark builds a DAG (Directed Acyclic Graph) of all transformations.
  • Actions: These are eager operations that trigger execution by forcing Spark to compute and return results or perform output operations. Actions break the laziness and execute the transformations.

Key Differences Between Transformations and Actions

FeatureTransformationsActions
DefinitionOperations that define a new dataset based on an existing one, but do not immediately execute.Operations that trigger the execution of transformations and return results or perform output.
Lazy EvaluationYes, transformations are lazily evaluated and only executed when an action is called.No, actions are eager and immediately compute the result by triggering the entire execution plan.
Execution TriggerDo not trigger computation immediately. Spark builds a DAG of transformations to optimize execution.Trigger the execution of the transformations and cause Spark to run jobs and return/output data.
Return TypeReturn a new RDD, DataFrame, or Dataset (these are still “recipes” and not materialized).Return a result to the driver program (like a value) or write data to an external storage system.
Example Operationsmap, filter, flatMap, join, groupBy, select, agg, orderBy.count, collect, first, take, reduce, saveAsTextFile, foreach.

Conclusion

  • Transformations are used to define what to do with the data but don’t execute until an action triggers them.
  • Actions are used to retrieve results or perform output, forcing Spark to execute the transformations.