Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

If a Delta table is saved in Blob Storage or Azure Data Lake Storage (ADLS), you access it using the file path rather than a cataloged name (like in Unity Catalog). Here’s how to read from and write to Delta tables stored in Blob Storage or ADLS in Spark SQL and PySpark.

Reading Delta Tables from Blob Storage or ADLS

To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

Syntax

# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
  

Writing Delta Tables to Blob Storage or ADLS

When writing to Delta tables, use the delta format and specify the path where you want to store the table.

Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks

# PySpark 
df.write.format("delta").mode("overwrite").save("path/to/delta/table")

Options and Parameters for Delta Read/Write

Options for Reading Delta Tables:

You can configure the read operation with options like:

  • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
  • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
  • timeTravel: Enables querying older versions of the Delta table.
df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
df.show()

Options for Writing Delta Tables:

mode: Controls the write mode.

  • overwrite: Overwrites the existing data.
  • append: Adds to existing data.
  • ignore: Ignores the write if data exists.
  • errorifexists: Defaults to throwing an error if data exists.

partitionBy: Partition the data by one or more columns.

overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("column_name") \
    .save("path/to/delta/table")

Time Travel and Versioning with Delta (PySpark)

Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()

# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()

Conclusion:

  • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
  • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
  • For reading, use .format("delta").load("path").
  • For writing, use .write.format("delta").save("path").

Read table from Unity Catalog and write table to Unity Catalog

To read from and write to Unity Catalog in PySpark, you typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name.

Reading from Unity Catalog

To read a table from Unity Catalog, specify the table’s full path:

# Reading a table
df = spark.read.table("catalog.schema.table")
df.show()

# Using Spark SQL
df = spark.sql("SELECT * FROM catalog.schema.table")

Writing to Unity Catalog

To write data to Unity Catalog, you specify the table name in the saveAsTable method:

# Writing a DataFrame to a new table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog.schema.new_table")

Options for Writing to Unity Catalog:

  • format: Set to "delta" for Delta Lake tables, as Unity Catalog uses Delta format.
  • mode: Options include overwrite, append, ignore, and error.

Example: Read, Transform, and Write Back to Unity Catalog

# Read data from a Unity Catalog table
df = spark.read.table("catalog_name.schema_name.source_table")

# Perform transformations
transformed_df = df.filter(df["column_name"] > 10)

# Write transformed data back to a different table
transformed_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_name.schema_name.target_table")

Comparison of Delta, JSON, and CSV Reads/Writes

FormatStorage LocationRead SyntaxWrite SyntaxNotes
DeltaUnity Catalogdf = spark.read.table("catalog.schema.table")df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.table")Unity Catalog natively supports Delta with schema enforcement and versioning.
Blob/ADLSdf = spark.read.format("delta").load("path/to/delta/folder")df.write.format("delta").mode("overwrite").save("path/to/delta/folder")Requires Delta Lake library; supports ACID transactions and time-travel capabilities.
JSONUnity CatalogNot directly supported in Unity Catalog; typically needs to be read as a Delta table or temporary table.Not directly supported; must be converted to Delta format before writing to Unity Catalog.Convert JSON to Delta format to enable integration with Unity Catalog.
Blob/ADLSdf = spark.read.json("path/to/json/files")df.write.mode("overwrite").json("path/to/json/folder")Simple structure, no schema enforcement by default; ideal for semi-structured data.
CSVUnity CatalogNot directly supported; CSV files should be imported as Delta tables or temporary views.Not directly supported; convert to Delta format for compatibility with Unity Catalog.Similar to JSON, requires conversion for use in Unity Catalog.
Blob/ADLSdf = spark.read.option("header", True).csv("path/to/csv/files")df.write.option("header", True).mode("overwrite").csv("path/to/csv/folder")Lacks built-in schema enforcement; additional steps needed for ACID or schema evolution.

Detailed Comparison and Notes:

  1. Unity Catalog
    • Delta: Unity Catalog fully supports Delta format, allowing for schema evolution, ACID transactions, and built-in security and governance.
    • JSON and CSV: To use JSON or CSV in Unity Catalog, convert them into Delta tables or load them as temporary views before making them part of Unity’s governed catalog. This is because Unity Catalog enforces structured data formats with schema definitions.
  2. Blob Storage & ADLS (Azure Data Lake Storage)
    • Delta: Blob Storage and ADLS support Delta tables if the Delta Lake library is enabled. Delta on Blob or ADLS retains most Delta features but may lack some governance capabilities found in Unity Catalog.
    • JSON & CSV: Both Blob and ADLS provide support for JSON and CSV formats, allowing flexibility with semi-structured data. However, they do not inherently support schema enforcement, ACID compliance, or governance features without Delta Lake.
  3. Delta Table Benefits:
    • Schema Evolution and Enforcement: Delta enables schema evolution, essential in big data environments.
    • Time Travel: Delta provides versioning, allowing access to past versions of data.
    • ACID Transactions: Delta ensures consistency and reliability in large-scale data processing.

delta: Schema Evolution

Schema Evolution in Databricks refers to the ability to automatically adapt and manage changes in the structure (schema) of a Delta Lake table over time. It allows users to modify the schema of an existing table (e.g., adding or updating columns) without the need for a complete rewrite of the data.

Key Features of Schema Evolution

  1. Automatic Adaptation: Delta Lake can automatically evolve the schema of a table when new columns are added to the incoming data, or when data types change, if certain configurations are enabled.
  2. Backward and Forward Compatibility: Delta Lake ensures that new data can be written to a table without breaking the existing schema. It also ensures that existing queries remain compatible, even if the schema changes.

Configuration for Schema Evolution

  • mergeSchema
    This option allows you to append new data with a schema that differs from the existing table schema. It merges the new schema into the table.
    Usage: Typically used when you are appending data.
  • overwriteSchema
    This option is used when you want to completely replace the schema of the table with the schema of the new data.
    Usage: Typically used when you are overwriting data

mergSchema

When new data has additional columns that aren’t present in the target Delta table, Delta Lake can automatically merge the new schema into the existing table schema.


# Append new data to the Delta table with automatic schema merging

df_new_data.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta-table")


overwriteSchema

If you want to replace the entire schema (including removing existing columns), you can use the overwriteSchema option.


# Overwrite the existing Delta table schema with new data

df_new_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/path/to/delta-table")


Configure spark.databricks.delta.schema.autoMerge

You can configure this setting at the following levels:

  • Session Level (applies to a specific session or job)
  • Cluster Level (applies to all jobs on the cluster)

Session-Level Configuration (Spark session level)

Once this is enabled, all write and merge operations in the session will automatically allow schema evolution.


# Enable auto schema merging for the session

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Cluster-Level Configuration

This enables automatic schema merging for all operations on the cluster without needing to set it in each job.

  1. Go to your Databricks Workspace.
  2. Navigate to Clusters and select your cluster.
  3. Go to the Configuration tab.
  4. Under Spark Config, add the following configuration:
    spark.databricks.delta.schema.autoMerge.enabled true

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Delta: Time Travel of Delta Table

Time Travel in Delta Lake allows you to query, restore, or audit the historical versions of a Delta table. This feature is useful for various scenarios, including recovering from accidental deletions, debugging, auditing changes, or simply querying past versions of your data.

Delta Lake maintains a transaction log that records all changes (inserts, updates, deletes) made to the table. Using Time Travel, you can access a previous state of the table by specifying a version number or a timestamp.

By default, data file retention is 7 days, log file retention is 30 days. After 7 days, file will delete, but log file still there.

You can access historical versions of a Delta table using two methods:

  1. By Version Number
  2. By Timestamp

Viewing Table History

# sql
DESCRIBE HISTORY my_delta_table;

Query a certain version Table

You can query a Delta table based on a specific version number by using the VERSION AS OF clause. Or timestamp using the TIMESTAMP AS OF clause.


# sql
SELECT * FROM my_delta_table VERSION AS OF 5;


#Python
spark.sql("SELECT * FROM my_delta_table VERSION AS OF 5")

Restore the Delta Table to an Older Version

You can use the RESTORE command to revert the Delta table to a previous state permanently. This modifies the current state of the Delta table to match a past version or timestamp. Delta Lake maintains the transaction log retention period set for the Delta table (by default, 30 days)

#sql
--restore table to earlier version 4
-- by version
RESTORE TABLE delta.`abfss://container@adlsAccount.dfs.windows.net/myDeltaTable` TO VERSION OF 4;

-- by timestamp
RESTORE TABLE my_delta_table TO TIMESTAMP AS OF '2024-10-07T12:30:00';

#python
spark.sql("RESTORE TABLE my_delta_table TO VERSION AS OF 5")
spark.sql("RESTORE TABLE my_delta_table TO TIMESTAMP AS OF '2024-10-07T12:30:00'")

Vacuum Command

The VACUUM command in Delta Lake is used to remove old files that are no longer in use by the Delta table. When you make updates, deletes, or upserts (MERGE) to a Delta table, Delta Lake creates new versions of the data while keeping older versions for Time Travel and data recovery. Over time, these old files can accumulate, consuming storage. The VACUUM command helps clean up these files to reclaim storage space.

This command will remove all files older than 7 days (by Default)


# sql
VACUUM my_delta_table;

# python
spark.sql("VACUUM my_delta_table")

Retention Duration Check

The configuration property


%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false / true;

spark.databricks.delta.retentionDurationCheck.enable in Delta Lake controls whether Delta Lake enforces the retention period check for the VACUUM operation. By default, Delta Lake ensures that data files are only deleted after the default retention period (typically 7 days) to prevent accidentally deleting files that might still be required for Time Travel or recovery.

When VACUUM is called, Delta Lake checks if the specified retention period is shorter than the minimum default (7 days). If it is, the VACUUM command will fail unless this safety check is disabled.

You can disable this check by setting the property spark.databricks.delta.retentionDurationCheck.enable to false, which allows you to set a retention period of less than 7 days or even vacuum data immediately (0 hours).

Disable the Retention Duration Check


#sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;

#python
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

set log Retention Duration


#sql 
# Set the log retention duration to 7 days
SET spark.databricks.delta.logRetentionDuration = '7 days';

# python 
# Set the log retention duration to 7 days
spark.conf.set("spark.databricks.delta.logRetentionDuration", "7 days")


Custom Retention Period


# sql
VACUUM my_delta_table RETAIN 1 HOURS;

# python
spark.sql("VACUUM my_delta_table RETAIN 1 HOURS")

Force Vacuum (Dangerous)


# sql
VACUUM my_delta_table RETAIN 0 HOURS;

Conclusion:

Delta Lake’s Time Travel feature is highly beneficial for data recovery, auditing, and debugging by enabling access to historical data versions. It provides flexibility to query and restore previous versions of the Delta table, helping maintain the integrity of large-scale data operations.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Delta Table, Delta Lake

A Delta table is a type of table that builds on the Delta Lake storage layer and brings ACID (Atomicity, Consistency, Isolation, Durability) transactions, schema enforcement, and scalable metadata management to traditional data lakes. It is designed for large-scale, reliable data processing and analytics. Delta tables enable you to manage both batch and streaming data with ease, and they are ideal for environments where data integrity and consistency are critical, such as in data lakes, data warehouses, and machine learning pipelines.

What is Delta Lake

Delta lake is an open-source technology, we use Delta Lake to store data in Delta tables. Delta lake improves data storage by supporting ACID transactions, high-performance query optimizations, schema evolution, data versioning and many other features.

FeatureTraditional Data LakesDelta Lake
Transaction SupportNo ACID transactionsFull ACID support
Data ConsistencyWeak guaranteesStrong guarantees with serializable isolation
Schema EnforcementNoneEnforced and allows schema evolution
Handling StreamingRequires separate infrastructureUnified batch and streaming
Data ManagementProne to issues like data corruptionReliable with audit trails and versioning
key differences

There is detail information at “Data lake vs delta lake vs data lakehouse, and data warehouses comparison

Key Features of Delta Tables

  1. ACID Transactions: Delta Lake ensures that operations like reads, writes, and updates are atomic, consistent, isolated, and durable, eliminating issues of partial writes and data corruption.
  2. Schema Enforcement: When writing data, Delta ensures that it matches the table’s schema, preventing incorrect or incomplete data from being written.
  3. Time Travel: Delta tables store previous versions of the data, which allows you to query, rollback, and audit historical data (also known as data versioning).
  4. Unified Streaming and Batch Processing: Delta tables allow you to ingest both batch and streaming data, enabling you to work seamlessly with either approach without complex rewrites.
  5. Efficient Data Upserts: You can perform MERGE operations (UPSERTS) efficiently, which is especially useful in scenarios where you need to insert or update data based on certain conditions.
  6. Optimized Performance: Delta Lake supports optimizations such as data skipping, Z-order clustering, and auto-compaction, improving query performance.

Creating and Using Delta Tables in PySpark or SQL

create a Delta table by writing a DataFrame in PySpark or SQL.

Create or Write a DataFrame to a Delta table

If we directly query delta table from adls using SQL, always use

 
--back single quotation mark `
delta.`abfss://contain@account.dfs.windows.net/path_and_table`

# python
# Write a DataFrame to a Delta table
df.write.format("delta").save("/mnt/delta/my_delta_table")


# sql
-- Creating a Delta Table
CREATE TABLE my_delta_table
USING delta
LOCATION '/mnt/delta/my_delta_table';

# sql
-- Insert data
INSERT INTO my_delta_table VALUES (1, 'John Doe'), (2,
'Jane Doe');

Reading from a Delta table


#python
delta_df = spark.read.format("delta").load("/mnt/delta/my_delta_table")
delta_df.show()


#sql
-- Query Delta table
SELECT * FROM my_delta_table;

-- directly query delta table from adls.
-- use  ` back single quotation mark
SELECT * 
FROM 
delta.`abfss://adlsContainer@adlsAccount.dfs.windows.net/Path_and_TableName`
VERSION AS OF 4;

Managing Delta Tables

Optimizing Delta Tables

To improve performance, you can run an optimize operation to compact small files into larger ones.


# sql 
OPTIMIZE my_delta_table;

Z-order Clustering

Z-order clustering is used to improve query performance by colocating related data in the same set of files. it is a technique used in Delta Lake (and other databases) to optimize data layout for faster query performance.


# sql
OPTIMIZE my_delta_table ZORDER BY (date);

Upserts (Merge)

Delta Lake makes it easy to perform Upserts (MERGE operation), which allows you to insert or update data in your tables based on certain conditions.


# sql

MERGE INTO my_delta_table t
USING new_data n
ON t.id = n.id
WHEN MATCHED THEN UPDATE SET t.value = n.value
WHEN NOT MATCHED THEN INSERT (id, value) VALUES (n.id, n.value); 

Conclusion

Delta Lake is a powerful solution for building reliable, high-performance data pipelines on top of data lakes. It enables advanced data management and analytics capabilities with features like ACID transactions, time travel, and schema enforcement, making it an ideal choice for large-scale, data-driven applications.

Delta tables are essential for maintaining high-quality, reliable, and performant data processing pipelines. They provide a way to bring transactional integrity and powerful performance optimizations to large-scale data lakes, enabling unified data processing for both batch and streaming use cases.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)