spark: RDD, Dataframe, Dataset, Transformation and Action

In Apache Spark, RDD, DataFrame, and Dataset are the core data structures used to handle distributed data. Each of these offers different levels of abstraction and optimization.

Basic Overview

FeatureRDDDataFrameDataset
DefinitionLow-level abstraction for distributed data (objects).
RDDs (Resilient Distributed Datasets) are the fundamental data structure in Spark, representing an immutable distributed collection of objects. They provide a low-level interface to Spark, allowing developers to perform operations directly on data in a functional programming style.
High-level abstraction for structured data (table).
DataFrames are similar to tables in relational databases and are used to represent structured data. They allow you to perform complex queries and operations on structured datasets with a SQL-like syntax.
Combines RDD and DataFrame, adds type safety (Scala/Java).
Datasets are an extension of DataFrames that provide the benefits of both RDDs and DataFrames, with compile-time type safety. They are particularly useful in Scala and Java for ensuring data types during compile time.
APIFunctional (map, filter, etc.)
The RDD API provides functional programming constructs, allowing transformations and actions through functions like map, filter, and reduce.
SQL-like API + relational functions.
The DataFrame API includes both SQL-like queries and relational functions, making it more user-friendly and easier to work with structured data.
Typed API + relational functions (strong typing in Scala/Java).
The Dataset API combines typed operations with the ability to perform relational functions, allowing for a more expressive and type-safe programming model in Scala/Java.
Data StructureCollection of elements (e.g., objects, arrays).
RDDs are essentially collections of objects, which can be of any type (primitive or complex). This means that users can work with various data types without a predefined schema.
Distributed table with named columns.
DataFrames represent structured data as a distributed table, where each column has a name and a type. This structured format makes it easier to work with large datasets and perform operations.
Typed distributed table with named columns.
Datasets also represent data in a structured format, but they enforce types at compile time, enhancing safety and performance, especially in statically typed languages like Scala and Java.
SchemaNo schemaDefined schema (column names)Schema with compile-time type checking (Scala/Java)
PerformanceLess optimized (no Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)Highly optimized (Catalyst/Tungsten)

Transformations and Actions

Transformations and Actions are two fundamental operations used to manipulate distributed data collections like RDDs, DataFrames, and Datasets.

actions” are operations that return the raw values, In other words, any RDD function that returns other than RDD[T] is considered as an action in spark programming.

transformations” are lazy meaning they do not get executed right away and action functions trigger to execute the transformations.

High-Level Differences

  • Transformations: These are lazy operations that define a new RDD, DataFrame, or Dataset by applying a function to an existing one. However, they do not immediately execute—Spark builds a DAG (Directed Acyclic Graph) of all transformations.
  • Actions: These are eager operations that trigger execution by forcing Spark to compute and return results or perform output operations. Actions break the laziness and execute the transformations.

Key Differences Between Transformations and Actions

FeatureTransformationsActions
DefinitionOperations that define a new dataset based on an existing one, but do not immediately execute.Operations that trigger the execution of transformations and return results or perform output.
Lazy EvaluationYes, transformations are lazily evaluated and only executed when an action is called.No, actions are eager and immediately compute the result by triggering the entire execution plan.
Execution TriggerDo not trigger computation immediately. Spark builds a DAG of transformations to optimize execution.Trigger the execution of the transformations and cause Spark to run jobs and return/output data.
Return TypeReturn a new RDD, DataFrame, or Dataset (these are still “recipes” and not materialized).Return a result to the driver program (like a value) or write data to an external storage system.
Example Operationsmap, filter, flatMap, join, groupBy, select, agg, orderBy.count, collect, first, take, reduce, saveAsTextFile, foreach.

Conclusion

  • Transformations are used to define what to do with the data but don’t execute until an action triggers them.
  • Actions are used to retrieve results or perform output, forcing Spark to execute the transformations.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Delta Table, Delta Lake

A Delta table is a type of table that builds on the Delta Lake storage layer and brings ACID (Atomicity, Consistency, Isolation, Durability) transactions, schema enforcement, and scalable metadata management to traditional data lakes. It is designed for large-scale, reliable data processing and analytics. Delta tables enable you to manage both batch and streaming data with ease, and they are ideal for environments where data integrity and consistency are critical, such as in data lakes, data warehouses, and machine learning pipelines.

What is Delta Lake

Delta lake is an open-source technology, we use Delta Lake to store data in Delta tables. Delta lake improves data storage by supporting ACID transactions, high-performance query optimizations, schema evolution, data versioning and many other features.

FeatureTraditional Data LakesDelta Lake
Transaction SupportNo ACID transactionsFull ACID support
Data ConsistencyWeak guaranteesStrong guarantees with serializable isolation
Schema EnforcementNoneEnforced and allows schema evolution
Handling StreamingRequires separate infrastructureUnified batch and streaming
Data ManagementProne to issues like data corruptionReliable with audit trails and versioning
key differences

There is detail information at “Data lake vs delta lake vs data lakehouse, and data warehouses comparison

Key Features of Delta Tables

  1. ACID Transactions: Delta Lake ensures that operations like reads, writes, and updates are atomic, consistent, isolated, and durable, eliminating issues of partial writes and data corruption.
  2. Schema Enforcement: When writing data, Delta ensures that it matches the table’s schema, preventing incorrect or incomplete data from being written.
  3. Time Travel: Delta tables store previous versions of the data, which allows you to query, rollback, and audit historical data (also known as data versioning).
  4. Unified Streaming and Batch Processing: Delta tables allow you to ingest both batch and streaming data, enabling you to work seamlessly with either approach without complex rewrites.
  5. Efficient Data Upserts: You can perform MERGE operations (UPSERTS) efficiently, which is especially useful in scenarios where you need to insert or update data based on certain conditions.
  6. Optimized Performance: Delta Lake supports optimizations such as data skipping, Z-order clustering, and auto-compaction, improving query performance.

Using Delta Tables in PySpark or SQL

If we directly query a existing delta table from ADLS using SQL, always use

 --back single quotation mark `
delta.`abfss://contain@account.dfs.windows.net/path_and_table`

Register, Create, Write a Delta table

Register a table point it to existing Delta table location

# sql
-- register a table point it to existing Delta table location
delta_table_path = "dbfs:/mnt/delta/table_path"
# Register the Delta table in the metastore
spark.sql(f"""
CREATE TABLE table_name
USING DELTA
LOCATION '{delta_table_path}'
""")

Creating a Delta Table

-- Creating a Delta Table
%sql
CREATE TABLE my_delta_table (
id int,
name string
)
USING delta
LOCATION '/mnt/delta/my_delta_table';

Write to delta table

# python
# Write a DataFrame to a Delta table
df.write.format("delta").save("/mnt/delta/my_delta_table")

# sql
-- Insert data
INSERT INTO my_delta_table VALUES (1, 'John Doe'), (2,
'Jane Doe');

Reading from a Delta table


#python
delta_df = spark.read.format("delta").load("/mnt/delta/my_delta_table")
delta_df.show()


#sql
-- Query Delta table
SELECT * FROM my_delta_table;

-- directly query delta table from adls.
-- use  ` back single quotation mark
SELECT * 
FROM 
delta.`abfss://adlsContainer@adlsAccount.dfs.windows.net/Path_and_TableName`
VERSION AS OF 4;

Managing Delta Tables

Optimizing Delta Tables

To improve performance, you can run an optimize operation to compact small files into larger ones.

# sql 
OPTIMIZE my_delta_table;

Z-order Clustering

Z-order clustering is used to improve query performance by colocating related data in the same set of files. it is a technique used in Delta Lake (and other databases) to optimize data layout for faster query performance.

# sql
OPTIMIZE my_delta_table ZORDER BY (date);

Upserts (Merge)

Delta Lake makes it easy to perform Upserts (MERGE operation), which allows you to insert or update data in your tables based on certain conditions.

using SQL scripts is the same as TSQL merge statement

% sql
MERGE INTO my_delta_table t
USING new_data n
ON t.id = n.id
WHEN MATCHED THEN UPDATE SET t.value = n.value
WHEN NOT MATCHED THEN INSERT (id, value) VALUES (n.id, n.value); 

In PySpark with Delta Lake:

The target table must be a Delta table and the source data is typically in a DataFrame.

Example Scenario
  • Target Table: target_table; Contains existing records.
  • Source DataFrame: source_df; Contains new or updated records.
  • Goal: Update existing rows if a match is found or insert new rows if no match exists.
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Define paths
target_table_path = "dbfs:/mnt/delta/target_table"

# Load the Delta table as a DeltaTable object
target_table = DeltaTable.forPath(spark, target_table_path)

# Source DataFrame (new data to upsert)
source_data = [
    (1, "Alice", "2023-01-01"),
    (2, "Bob", "2023-01-02"),
    (4, "Eve", "2023-01-04")  # New record
]
columns = ["id", "name", "date"]
source_df = spark.createDataFrame(source_data, columns)

# Perform the merge operation
target_table.alias("t").merge(
    source_df.alias("s"),
    "t.id = s.id"  # Join condition: match rows based on `id`
).whenMatchedUpdate(
    set={
        "name": "s.name",  # Update `name` column
        "date": "s.date"   # Update `date` column
    }
).whenNotMatchedInsert(
    values={
        "id": "s.id",      # Insert `id`
        "name": "s.name",  # Insert `name`
        "date": "s.date"   # Insert `date`
    }
).execute()

# Verify the result
result_df = spark.read.format("delta").load(target_table_path)
result_df.show()
Explanation of the Code
  1. Target Table (target_table):
    • The Delta table is loaded using DeltaTable.forPath.
    • This table contains existing data where updates or inserts will be applied.
  2. Source DataFrame (source_df):
    • This DataFrame contains new or updated records.
  3. Join Condition ("t.id = s.id"):
    • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
  4. whenMatchedUpdate:
    • If a matching row is found, update the name and date columns in the target table.
  5. whenNotMatchedInsert:
    • If no matching row is found, insert the new record from the source DataFrame into the target table.
  6. execute():
    • Executes the merge operation, applying updates and inserts.
  7. Result Verification:
    • After the merge, the updated Delta table is read and displayed.

Conclusion

Delta Lake is a powerful solution for building reliable, high-performance data pipelines on top of data lakes. It enables advanced data management and analytics capabilities with features like ACID transactions, time travel, and schema enforcement, making it an ideal choice for large-scale, data-driven applications.

Delta tables are essential for maintaining high-quality, reliable, and performant data processing pipelines. They provide a way to bring transactional integrity and powerful performance optimizations to large-scale data lakes, enabling unified data processing for both batch and streaming use cases.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)