delta: Schema Evolution

Schema Evolution in Databricks refers to the ability to automatically adapt and manage changes in the structure (schema) of a Delta Lake table over time. It allows users to modify the schema of an existing table (e.g., adding or updating columns) without the need for a complete rewrite of the data.

Key Features of Schema Evolution

  1. Automatic Adaptation: Delta Lake can automatically evolve the schema of a table when new columns are added to the incoming data, or when data types change, if certain configurations are enabled.
  2. Backward and Forward Compatibility: Delta Lake ensures that new data can be written to a table without breaking the existing schema. It also ensures that existing queries remain compatible, even if the schema changes.

Configuration for Schema Evolution

  • mergeSchema
    This option allows you to append new data with a schema that differs from the existing table schema. It merges the new schema into the table.
    Usage: Typically used when you are appending data.
  • overwriteSchema
    This option is used when you want to completely replace the schema of the table with the schema of the new data.
    Usage: Typically used when you are overwriting data

mergSchema

When new data has additional columns that aren’t present in the target Delta table, Delta Lake can automatically merge the new schema into the existing table schema.


# Append new data to the Delta table with automatic schema merging

df_new_data.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta-table")


overwriteSchema

If you want to replace the entire schema (including removing existing columns), you can use the overwriteSchema option.


# Overwrite the existing Delta table schema with new data

df_new_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/path/to/delta-table")


Configure spark.databricks.delta.schema.autoMerge

You can configure this setting at the following levels:

  • Session Level (applies to a specific session or job)
  • Cluster Level (applies to all jobs on the cluster)

Session-Level Configuration (Spark session level)

Once this is enabled, all write and merge operations in the session will automatically allow schema evolution.


# Enable auto schema merging for the session

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Cluster-Level Configuration

This enables automatic schema merging for all operations on the cluster without needing to set it in each job.

  1. Go to your Databricks Workspace.
  2. Navigate to Clusters and select your cluster.
  3. Go to the Configuration tab.
  4. Under Spark Config, add the following configuration:
    spark.databricks.delta.schema.autoMerge.enabled true

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Delta: Time Travel of Delta Table

Time Travel in Delta Lake allows you to query, restore, or audit the historical versions of a Delta table. This feature is useful for various scenarios, including recovering from accidental deletions, debugging, auditing changes, or simply querying past versions of your data.

Delta Lake maintains a transaction log that records all changes (inserts, updates, deletes) made to the table. Using Time Travel, you can access a previous state of the table by specifying a version number or a timestamp.

By default, data file retention is 7 days, log file retention is 30 days. After 7 days, file will delete, but log file still there.

You can access historical versions of a Delta table using two methods:

  1. By Version Number
  2. By Timestamp

Viewing Table History

# sql
DESCRIBE HISTORY my_delta_table;

Query a certain version Table

You can query a Delta table based on a specific version number by using the VERSION AS OF clause. Or timestamp using the TIMESTAMP AS OF clause.


# sql
SELECT * FROM my_delta_table VERSION AS OF 5;


#Python
spark.sql("SELECT * FROM my_delta_table VERSION AS OF 5")

Restore the Delta Table to an Older Version

You can use the RESTORE command to revert the Delta table to a previous state permanently. This modifies the current state of the Delta table to match a past version or timestamp. Delta Lake maintains the transaction log retention period set for the Delta table (by default, 30 days)

#sql
--restore table to earlier version 4
-- by version
RESTORE TABLE delta.`abfss://container@adlsAccount.dfs.windows.net/myDeltaTable` TO VERSION OF 4;

-- by timestamp
RESTORE TABLE my_delta_table TO TIMESTAMP AS OF '2024-10-07T12:30:00';

#python
spark.sql("RESTORE TABLE my_delta_table TO VERSION AS OF 5")
spark.sql("RESTORE TABLE my_delta_table TO TIMESTAMP AS OF '2024-10-07T12:30:00'")

Vacuum Command

The VACUUM command in Delta Lake is used to remove old files that are no longer in use by the Delta table. When you make updates, deletes, or upserts (MERGE) to a Delta table, Delta Lake creates new versions of the data while keeping older versions for Time Travel and data recovery. Over time, these old files can accumulate, consuming storage. The VACUUM command helps clean up these files to reclaim storage space.

This command will remove all files older than 7 days (by Default)


# sql
VACUUM my_delta_table;

# python
spark.sql("VACUUM my_delta_table")

Retention Duration Check

The configuration property


%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false / true;

spark.databricks.delta.retentionDurationCheck.enable in Delta Lake controls whether Delta Lake enforces the retention period check for the VACUUM operation. By default, Delta Lake ensures that data files are only deleted after the default retention period (typically 7 days) to prevent accidentally deleting files that might still be required for Time Travel or recovery.

When VACUUM is called, Delta Lake checks if the specified retention period is shorter than the minimum default (7 days). If it is, the VACUUM command will fail unless this safety check is disabled.

You can disable this check by setting the property spark.databricks.delta.retentionDurationCheck.enable to false, which allows you to set a retention period of less than 7 days or even vacuum data immediately (0 hours).

Disable the Retention Duration Check


#sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;

#python
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

set log Retention Duration


#sql 
# Set the log retention duration to 7 days
SET spark.databricks.delta.logRetentionDuration = '7 days';

# python 
# Set the log retention duration to 7 days
spark.conf.set("spark.databricks.delta.logRetentionDuration", "7 days")


Custom Retention Period


# sql
VACUUM my_delta_table RETAIN 1 HOURS;

# python
spark.sql("VACUUM my_delta_table RETAIN 1 HOURS")

Force Vacuum (Dangerous)


# sql
VACUUM my_delta_table RETAIN 0 HOURS;

Conclusion:

Delta Lake’s Time Travel feature is highly beneficial for data recovery, auditing, and debugging by enabling access to historical data versions. It provides flexibility to query and restore previous versions of the Delta table, helping maintain the integrity of large-scale data operations.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Delta Table, Delta Lake

A Delta table is a type of table that builds on the Delta Lake storage layer and brings ACID (Atomicity, Consistency, Isolation, Durability) transactions, schema enforcement, and scalable metadata management to traditional data lakes. It is designed for large-scale, reliable data processing and analytics. Delta tables enable you to manage both batch and streaming data with ease, and they are ideal for environments where data integrity and consistency are critical, such as in data lakes, data warehouses, and machine learning pipelines.

What is Delta Lake

Delta lake is an open-source technology, we use Delta Lake to store data in Delta tables. Delta lake improves data storage by supporting ACID transactions, high-performance query optimizations, schema evolution, data versioning and many other features.

FeatureTraditional Data LakesDelta Lake
Transaction SupportNo ACID transactionsFull ACID support
Data ConsistencyWeak guaranteesStrong guarantees with serializable isolation
Schema EnforcementNoneEnforced and allows schema evolution
Handling StreamingRequires separate infrastructureUnified batch and streaming
Data ManagementProne to issues like data corruptionReliable with audit trails and versioning
key differences

There is detail information at “Data lake vs delta lake vs data lakehouse, and data warehouses comparison

Key Features of Delta Tables

  1. ACID Transactions: Delta Lake ensures that operations like reads, writes, and updates are atomic, consistent, isolated, and durable, eliminating issues of partial writes and data corruption.
  2. Schema Enforcement: When writing data, Delta ensures that it matches the table’s schema, preventing incorrect or incomplete data from being written.
  3. Time Travel: Delta tables store previous versions of the data, which allows you to query, rollback, and audit historical data (also known as data versioning).
  4. Unified Streaming and Batch Processing: Delta tables allow you to ingest both batch and streaming data, enabling you to work seamlessly with either approach without complex rewrites.
  5. Efficient Data Upserts: You can perform MERGE operations (UPSERTS) efficiently, which is especially useful in scenarios where you need to insert or update data based on certain conditions.
  6. Optimized Performance: Delta Lake supports optimizations such as data skipping, Z-order clustering, and auto-compaction, improving query performance.

Creating and Using Delta Tables in PySpark or SQL

create a Delta table by writing a DataFrame in PySpark or SQL.

Create or Write a DataFrame to a Delta table

If we directly query delta table from adls using SQL, always use

 
--back single quotation mark `
delta.`abfss://contain@account.dfs.windows.net/path_and_table`

# python
# Write a DataFrame to a Delta table
df.write.format("delta").save("/mnt/delta/my_delta_table")


# sql
-- Creating a Delta Table
CREATE TABLE my_delta_table
USING delta
LOCATION '/mnt/delta/my_delta_table';

# sql
-- Insert data
INSERT INTO my_delta_table VALUES (1, 'John Doe'), (2,
'Jane Doe');

Reading from a Delta table


#python
delta_df = spark.read.format("delta").load("/mnt/delta/my_delta_table")
delta_df.show()


#sql
-- Query Delta table
SELECT * FROM my_delta_table;

-- directly query delta table from adls.
-- use  ` back single quotation mark
SELECT * 
FROM 
delta.`abfss://adlsContainer@adlsAccount.dfs.windows.net/Path_and_TableName`
VERSION AS OF 4;

Managing Delta Tables

Optimizing Delta Tables

To improve performance, you can run an optimize operation to compact small files into larger ones.


# sql 
OPTIMIZE my_delta_table;

Z-order Clustering

Z-order clustering is used to improve query performance by colocating related data in the same set of files. it is a technique used in Delta Lake (and other databases) to optimize data layout for faster query performance.


# sql
OPTIMIZE my_delta_table ZORDER BY (date);

Upserts (Merge)

Delta Lake makes it easy to perform Upserts (MERGE operation), which allows you to insert or update data in your tables based on certain conditions.


# sql

MERGE INTO my_delta_table t
USING new_data n
ON t.id = n.id
WHEN MATCHED THEN UPDATE SET t.value = n.value
WHEN NOT MATCHED THEN INSERT (id, value) VALUES (n.id, n.value); 

Conclusion

Delta Lake is a powerful solution for building reliable, high-performance data pipelines on top of data lakes. It enables advanced data management and analytics capabilities with features like ACID transactions, time travel, and schema enforcement, making it an ideal choice for large-scale, data-driven applications.

Delta tables are essential for maintaining high-quality, reliable, and performant data processing pipelines. They provide a way to bring transactional integrity and powerful performance optimizations to large-scale data lakes, enabling unified data processing for both batch and streaming use cases.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Comparison Partitioning Strategies and Methods

In distributed computing frameworks like Apache Spark (and PySpark), different partitioning strategies are used to distribute and manage data across nodes in a cluster. These strategies influence how data is partitioned, which affects the performance of your jobs. Some common partitioning techniques include hash partitioning, range partitioning, and others like broadcast joins.

Key Differences Between Partitioning Methods

Partitioning MethodKey FeatureBest ForShufflingEffect on Data Layout
partitionBy()
General Partitioning
 Optimizing data layout on disk (file system)NoOrganizes data into folders by column values
Hash PartitioningEvenly distributes data based on hash function.Query, such as Joins, groupBy operations, when you need uniform distribution.yesRedistributes data across partitions evenly
Round RobinSimple, even distribution of rows.Even row distribution without considering valuesYes   Distributes rows evenly across partitions
Range PartitioningData is divided based on sorted ranges.Queries based on ranges, such as time-series data.Yes (if internal)Data is sorted and divided into ranges across partitions
Custom PartitioningCustom logic for partitioning.When you have specific partitioning needs not covered by standard methods.Yes (if internal)Defined by custom function
Co-location of PartitionsPartition both datasets by the same key for optimized joins.Joining two datasets with the same key.No (if already co-located)Ensures both datasets are partitioned the same way
Broadcast JoinSends smaller datasets to all nodes to avoid shuffles.Joins where one dataset is much smaller than the other.No (avoids shuffle)Broadcasts small dataset across nodes for local join
Key Differences Between Partitioning Methods

Key Takeaways

  • partitionBy() is used for data organization on disk, especially when writing out data in formats like Parquet or ORC.
  • Hash Partitioning and Round Robin Partitioning are used for balancing data across Spark

General Partitioning

Distributing data within Spark jobs for processing. Use partitionBy() when writing data to disk to optimize data layout and enable efficient querying later.


df.write.format("delta").partitionBy("gender", "age").save("/mnt/delta/partitioned_data")

save in this way

Hash Partitioning


df = df.repartiton(10, 'class_id')

Hash partitioning is used internally within Spark’s distributed execution to split the data across multiple nodes for parallel processing. It Splits our data in such way that elements with the same hash (can be key, keys, or a function) will be in the same

Hash Partitioning Used during processing within Spark, it redistributes the data across partitions based on a hash of the column values, ensuring an even load distribution across nodes for tasks like joins and aggregations. Involves shuffling.

Round Robin Partitioning

Round robin partitioning evenly distributes records across partitions in a circular fashion, meaning each row is assigned to the next available partition.

Range Partitioning

only it’s based on a range of values.

Broadcast Join (replication Partitioning)

Broadcast joins (known as replication partition) in Spark involve sending a smaller dataset to all nodes in the cluster, that means all nodes have the same small dataset or says duplicated small dataset to all nodes. It is allowing each partition of the larger dataset to be joined with the smaller dataset locally without requiring a shuffle.

Detailed comparison of each partitioning methods

Partitioning MethodPurposeWhen UsedShufflingHow It Works
General Partitioning (partitionBy())Organizing data on disk (file partitioning)When writing data (e.g., Parquet, ORC)No shuffleData is partitioned into folders by column values when writing to disk
Hash Partitioning (repartition(column_name))Evenly distributing data for parallel processingDuring processing for joins, groupBy, etc.Yes (shuffle data across nodes)Applies a hash function to the column value to distribute data evenly across partitions
Round Robin PartitioningDistributes rows evenly without considering valuesWhen you want even distribution but don’t need value-based groupingYes (shuffle)Rows are evenly assigned to partitions in a circular manner, disregarding content
Range PartitioningDistribute data into partitions based on a range of valuesWhen processing or writing range-based data (e.g., dates)Yes (if used internally during processing)Data is sorted by the partitioning column and divided into ranges across partitions
Custom PartitioningApply custom logic to determine how data is partitionedFor complex partitioning logic in special use casesYes (depends on logic)User-defined partitioning function determines partition assignment
Co-location PartitioningEnsures two datasets are partitioned the same way (to avoid shuffling during joins)To optimize joins when both datasets have the same partitioning columnNo (if already partitioned the same way)Both datasets are partitioned by the same key (e.g., by user_id) to avoid shuffle during joins
Broadcast Join (Partitioning)Send a small dataset to all nodes for local joins without shuffleWhen joining a small dataset with a large oneNo shuffle (avoids shuffle by broadcasting)The smaller dataset is broadcast to each node, avoiding the need for shuffling large data

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Partition in databricks

In Databricks, partitioning is a strategy used to organize and store large datasets into smaller, more manageable chunks based on specific column values. Partitioning can improve query performance and resource management when working with large datasets in Spark, especially in distributed environments like Databricks.

Key Concepts of Partitioning in Databricks

Partitioning in Tables:

When saving a DataFrame as a table or Parquet file in Databricks, you can specify partitioning columns to divide the data into separate directories. Each partition contains a subset of the data based on the values of the partitioning column(s).

Partitioning in DataFrames

Spark partitions data in-memory across nodes in the cluster to parallelize processing. Partitioning helps distribute the workload evenly across the cluster.

Types of Partitioning

Static Partitioning (Manual Partitioning)

When saving or writing data to a file or table, you can manually specify one or more columns to partition the data by. This helps when querying large tables, as Spark can scan only the relevant partitions instead of the entire dataset.

Dynamic Partitioning (Automatic Partitioning)

Spark automatically partitions a DataFrame based on the size of the data and available resources. The number of partitions is determined by Spark’s internal algorithm based on the data’s size and complexity.

Let’s say, there is dataframe

Partitioning in Databricks File System (DBFS)

When writing data to files in Databricks (e.g., Parquet, Delta), you can specify partitioning columns to optimize reads and queries. For example, when you partition by a column, Databricks will store the data in different folders based on that column’s values.


# Example of saving a DataFrame with partitioning
df.write.partitionBy("year", "month").parquet("/mnt/data/name_partitioned")

In this example, the data will be saved in a directory structure like:

/mnt/data/name_partitioned/gender=F
/mnt/data/name_partitioned/gender=M

Partitioning in Delta Tables

In Delta Lake (which is a storage layer on top of Databricks), partitioning is also a best practice to optimize data management and queries. When you define a Delta table, you can specify partitions to enable efficient query pruning, which results in faster reads and reduced I/O.


# Writing a Delta table with partitioning
df.write.format("delta").partitionBy("gender", "age").save("/mnt/delta/partitioned_data")

In this example, the data will be saved in a directory structure like:

/mnt/delta/partitioned_data/gender=F/age=34
/mnt/delta/partitioned_data/gender=F/age=45
/mnt/delta/partitioned_data/gender=M/age=23
/mnt/delta/partitioned_data/gender=M/age=26
/mnt/delta/partitioned_data/gender=M/age=32
/mnt/delta/partitioned_data/gender=M/age=43

Optimizing Spark DataFrame Partitioning

When working with in-memory Spark DataFrames in Databricks, you can manually control the number of partitions to optimize performance.

Repartition

This increases or decreases the number of partitions.
This operation reshuffles the data, redistributing it into a new number of partitions.


df = df.repartition(10)  # repartition into 10 partitions

Coalesce

This reduces the number of partitions without triggering a shuffle operation (which is often more efficient than repartition).
This is a more efficient way to reduce the number of partitions without triggering a shuffle.


df = df.coalesce(5) # reduce partitions to 5

When to Use Partitioning

  • Partitioning works best when you frequently query the data using the columns you’re partitioning by. For example, partitioning by date (e.g., year, month, day) is a common use case when working with time-series data.
  • Don’t over-partition: Too many partitions can lead to small file sizes, which increases the overhead of managing the partitions.

Summary

  • Partitioning divides data into smaller, more manageable chunks.
  • It improves query performance by allowing Spark to read only relevant data.
  • You can control partitioning when saving DataFrames or Delta tables to optimize storage and query performance.
  • Use repartition() or coalesce() to manage in-memory partitions for better parallelization.
  • Use coalesce() to reduce partitions without shuffling.
  • Use repartition() when you need to rebalance data.

Azure Data Factory or Synapse Copy Activity with File System

In Azure Data Factory (ADF) or Synapse, using Copy Activity with a File System as a source or sink is common when dealing with on-premises file systems, network file shares, or even cloud-based file storage systems. Here’s an overview of how it works, key considerations, and steps to set it up.

Key Components and setup with File System:

Create a File System Linked Service

Linked Service: For on-premises or network file systems, you typically need a Self-hosted Integration Runtime (SHIR).

Fill in the required fields:

  • Connection: Specify the file system type (e.g., network share or local path).
  • Authentication: Provide the appropriate credentials, such as username/password, or key-based authentication.
  • If the file system is on-premises, configure the Self-hosted Integration Runtime to access it.

Create File System Dataset

Go to Datasets in ADF and create a new dataset. Select File System as the data source.

Configure the dataset to point to the correct file or folder:

  • Specify the File Path.
  • Define the file format (e.g., CSV, JSON, XML).
  • Set any schema information if required (for structured data like CSV).

Considerations:

  • Integration Runtime: For on-premises file systems, the Self-hosted Integration Runtime (SHIR) is essential to securely move data from private networks.
  • Performance: Data transfer speeds depend on network configurations (for on-prem) and ADF’s parallelism settings.
  • File Formats: Ensure proper handling of different file formats (e.g., CSV, JSON, Binary etc.) and schema mapping for structured files.
  • Security: Ensure credentials and network configurations are correctly set up, and consider encryption if dealing with sensitive data.

Common Errors:

  • Connection issues: If the SHIR is not correctly configured, or if there are issues with firewall or network settings, ADF may not be able to access the file system.
  • Permission issues: Ensure that the correct permissions are provided to access the file system (file share, SMB, FTP, etc.).

Comparison between All-Purpose Cluster, Job Cluster, SQL Warehouse and Instance Pools

side-by-side comparison of “All-Purpose Cluster”, “Job Cluster”, “SQL Warehouse” and Instance Pools in Azure Databricks, covering their key features, use cases, and differences:

Key Differences

  • All-Purpose Cluster: Best for interactive workloads, collaborative notebooks, and exploration. It stays running until you manually stop it or it hits the idle timeout. Not as cost-effective for long-running or scheduled tasks.
  • Job Cluster: Best for scheduled and automated jobs. It starts automatically when the job begins and shuts down after the job finishes, which makes it cost-efficient and ideal for production ETL or data processing jobs.
  • SQL Warehouse: Best for SQL analytics and BI tool integration. It is specifically optimized for SQL queries, offering auto-scaling based on query load and cost-efficient SQL query execution on Delta Lake tables.
  • Instance Pools: Reducing startup times for frequently created clusters. Sharing resources among multiple teams or clusters.

Side by side comparison

All-Purpose ClusterJob ClusterSQL Warehouse (formerly SQL Endpoints)Instance Pools
PurposeGeneral-purpose compute environment for interactive workloads.Dedicated to run a specific job or task. Automatically terminates after the job.Optimized for running SQL queries, dashboards, and BI analytics on Delta Lake.resource management feature that pre-allocate virtual machines (VMs) to reduce cluster startup times and optimize costs.
UsageFor interactive development in notebooks, collaboration, and ad-hoc analysis.For scheduled or automated jobs (e.g., ETL tasks) that need to run Spark-based processing.For SQL-based workloads, querying data in Delta Lake, and BI tools (e.g., Power BI, Tableau).Supporting clusters
Primary WorkloadInteractive development (notebooks, data exploration, ad-hoc queries).Automated Spark jobs with dedicated, isolated clusters for each job.SQL analytics and dashboards, running SQL queries against Delta Lake tables.Resource optimization
Cluster LifecycleRemains active until manually terminated or idle timeout is reached.Created automatically when a job is triggered, and terminated when the job is done.SQL Warehouses scale up/down based on query demand; remain active based on usage settings.Pre-warmed VMs (idle terminate)
Resource AllocationConfigurable resources, manual start/stop, and autoscaling available.Dynamically allocated resources based on job requirements, with autoscaling.Autoscaling based on SQL query demand; optimized for SQL workloads.
CostAlways running unless manually stopped or auto-terminated, can be expensive if left running.More cost-efficient for scheduled jobs, as the cluster runs only during the job execution.Efficient for SQL queries with autoscaling; cost based on query execution.Optimizes cluster creation
PerformanceGood for interactive, collaborative workloads but may incur higher costs if not optimized.Highly performant for running isolated, parallel jobs without interference from other workloads.Optimized for low-latency SQL query performance and concurrent query execution.
ScalingCan scale automatically based on workload demand (within limits set by the user).Scales based on the job’s needs; new clusters can be created for each job.Scales automatically to accommodate concurrent SQL queries.
IsolationNot isolated — multiple users can share the cluster, which may impact performance.Fully isolated — each job runs on a separate cluster.Isolated SQL queries but shared resources for concurrent workloads.Shared resource pool
Ideal ForData exploration, notebook development, machine learning experiments, ad-hoc queries.Scheduled ETL/ELT jobs, production jobs, or one-time data processing tasks.SQL analytics, dashboards, and BI tool integration for querying Delta Lake.Supporting clusters
Supported LanguagesPython, Scala, R, SQL, and more via notebooks.Python, Scala, R, SQL (job-specific).SQL only.
ManagementRequires manual monitoring and termination.Automatic termination after job completion.Automatically managed scaling and uptime based on usage.Faster cluster launches
Example Use CaseRunning notebooks to explore and analyze data, performing machine learning experiments.Running a scheduled Spark job that processes data in a pipeline or transformation.Running SQL queries on Delta Lake, powering dashboards, or connecting to BI tools.
Restart BehaviorCan be manually stopped and restarted; the Cluster ID remains the same.Automatically created and terminated for each job run; new Cluster ID for each job.SQL Warehouse remains active based on usage, auto-scaling handles load; Warehouse ID remains the same.Faster cluster launches
Side by side clusters comparisons.

Summary:

  • All-Purpose Clusters are ideal for interactive data exploration and multi-user environments, but they can be costly if left running for too long.
  • Job Clusters are used for single, isolated tasks (like scheduled ETL jobs) and are cost-effective since they are automatically created and terminated.
  • SQL Warehouses are specialized for SQL queries and business intelligence reporting, offering cost efficiency through on-demand scaling for SQL analytics.

dbutils: Databricks File System, dbutils

Databricks File System (DBFS)  is a distributed file system mounted into a Databricks workspace and available on Databricks clusters. DBFS is an abstraction on top of scalable object storage.

Databricks recommends that you store data in mounted object storage rather than in the DBFS root. The DBFS root is not intended for production customer data.

DBFS root is the default file system location provisioned for a Databricks workspace when the workspace is created. It resides in the cloud storage account associated with the Databricks workspace

Databricks dbutils

**dbutils** is a set of utility functions provided by Databricks to help manage and interact with various resources in a Databricks environment, such as files, jobs, widgets, secrets, and notebooks. It is commonly used in Databricks notebooks to perform tasks like handling file systems, retrieving secrets, running notebooks, and controlling job execution.

Dbutils.help()

  • credentials: DatabricksCredentialUtils -> Utilities for interacting with credentials within notebooks
  • data: DataUtils -> Utilities for understanding and interacting with datasets (EXPERIMENTAL)
  • fs: DbfsUtils -> Manipulates the Databricks filesystem (DBFS) from the console
  • jobs: JobsUtils -> Utilities for leveraging jobs features
  • library: LibraryUtils -> Utilities for session isolated libraries
  • meta: MetaUtils -> Methods to hook into the compiler (EXPERIMENTAL)
  • notebook: NotebookUtils -> Utilities for the control flow of a notebook (EXPERIMENTAL)
  • preview: Preview -> Utilities under preview category
  • secrets: SecretUtils -> Provides utilities for leveraging secrets within notebooks
  • widgets: WidgetsUtils -> Methods to create and get bound value of input widgets inside notebooks

1. dbutils.fs (File System Utilities)

dbutils.fs.help()

dbutils.fs provides utilities to interact with various file systems, like DBFS (Databricks File System), Azure Blob Storage, and others, similarly to how you would interact with a local file system.

List Files:

dbutils.fs.ls(“/mnt/”)

Mount Azure Blob Storage:


dbutils.fs.mount(
    source = "wasbs://<container>@<storage-account>.blob.core.windows.net",
    mount_point = "/mnt/myblobstorage",
    extra_configs = {"<key>": "<value>"}
)

For Azure Blob: wasbs://
For Azure Data Lake Gen2: abfss://
For S3: s3a://

Unmount

dbutils.fs.unmount("/mnt/myblobstorage")

Copy Files:

dbutils.fs.cp("/mnt/source_file.txt", "/mnt/destination_file.txt")

Remove Files:

dbutils.fs.rm("/mnt/myfolder/", True)  # True to remove recursively

Move Files:

dbutils.fs.mv("/mnt/source_file.txt", "/mnt/destination_file.txt")

dbutils.secrets (Secret Management)

dbutils.secrets is used to retrieve secrets stored in Databricks Secret Scopes. This is essential for securely managing sensitive data like passwords, API keys, or tokens.

dbutils.secrets.help()

Get a Secret:

my_secret = dbutils.secrets.get(scope="my-secret-scope", key="my-secret-key")

List Secrets:

dbutils.secrets.list(scope="my-secret-scope")

List Secret Scopes:

dbutils.secrets.listScopes()

dbutils.widgets (Parameter Widgets)

dbutils.notebook provides functionality to run one notebook from another and pass data between notebooks. It’s useful when you want to build modular pipelines by chaining multiple notebooks.

dbutils.widgets.help()

Run Another Notebook:

dbutils.notebook.run("/path/to/other_notebook", 60, {"param1": "value1", "param2": "value2"})

Runs another notebook with specified timeout (in seconds) and parameters. You can pass parameters as a dictionary.

Exit a Notebook:

dbutils.notebook.exit("Success")

Exits the notebook with a status message or value.

Return Value from a Notebook:

result = dbutils.notebook.run("/path/to/notebook", 60, {"param": "value"})
print(result)

dbutils.jobs (Job Utilities)

dbutils.jobs helps with tasks related to job execution within Databricks, such as getting details about the current job or task.

dbutils.jobs.help()

Get Job Run Information

job_info = dbutils.jobs.taskValues.get(job_id="<job_id>", task_key="<task_key>")

dbutils.library

Manages libraries within Databricks, like installing and updating them (for clusters).

dbutils.library.installPyPI("numpy")

Example

# Mount Azure Blob Storage using dbutils.fs
dbutils.fs.mount(
    source = "wasbs://mycontainer@myaccount.blob.core.windows.net",
    mount_point = "/mnt/mydata",
    extra_configs = {"fs.azure.account.key.myaccount.blob.core.windows.net": "<storage-key>"}
)

# List contents of the mount
display(dbutils.fs.ls("/mnt/mydata"))

# Get a secret from a secret scope
db_password = dbutils.secrets.get(scope="my-secret-scope", key="db-password")

# Create a dropdown widget to choose a dataset
dbutils.widgets.dropdown("dataset", "dataset1", ["dataset1", "dataset2", "dataset3"], "Choose Dataset")

# Get the selected dataset value
selected_dataset = dbutils.widgets.get("dataset")
print(f"Selected dataset: {selected_dataset}")

# Remove all widgets after use
dbutils.widgets.removeAll()

# Run another notebook and pass parameters
result = dbutils.notebook.run("/path/to/notebook", 60, {"input_param": "value"})
print(result)

Magic Command

list

Aspect%fs (Magic Command)%sh (Magic Command)dbutils.fs (Databricks Utilities)os.<> (Python OS Module)
Example Usage%fs ls /databricks-datasets%sh ls /tmpdbutils.fs.ls(“/databricks-datasets”)import os
os.listdir(“/tmp”)
Cloud Storage MountsCan access mounted cloud storage paths.No, unless the cloud storage is accessible from the driver node.Can mount and access external cloud storage (e.g., S3, Azure Blob) to DBFS.No access to mounted DBFS or cloud storage.
Use CaseLightweight access to DBFS for listing, copying, removing files.Execute system-level commands from notebooks.Programmatic, flexible access to DBFS and cloud storage.Access files and environment variables on the local node.

Unity Catalog: Creating Tables

A table resides in a schema and contains rows of data. All tables created in Azure Databricks use Delta Lake by default. Tables backed by Delta Lake are also called Delta tables.

A Delta table stores data as a directory of files in cloud object storage and registers table metadata to the metastore within a catalog and schema. All Unity Catalog managed tables and streaming tables are Delta tables. Unity Catalog external tables can be Delta tables but are not required to be.

Table types

Managed tables: Managed tables manage underlying data files alongside the metastore registration.

External tables: External tables, sometimes called unmanaged tables, decouple the management of underlying data files from metastore registration. Unity Catalog external tables can store data files using common formats readable by external systems.

Delta tables: The term Delta table is used to describe any table backed by Delta Lake. Because Delta tables are the default on Azure Databricks,

Streaming tables: Streaming tables are Delta tables primarily used for processing incremental data.

Foreign tables: Foreign tables represent data stored in external systems connected to Azure Databricks through Lakehouse Federation. 

Feature tables: Any Delta table managed by Unity Catalog that has a primary key is a feature table.

Hive tables (legacy): Hive tables describe two distinct concepts on Azure Databricks, Tables registered using the legacy Hive metastore store data in the legacy DBFS root, by default.

Live tables (deprecated): The term live tables refers to an earlier implementation of functionality now implemented as materialized views

Basic Permissions

To create a table, users must have CREATE TABLE and USE SCHEMA permissions on the schema, and they must have the USE CATALOG permission on its parent catalog. To query a table, users must have the SELECT permission on the table, the USE SCHEMA permission on its parent schema, and the USE CATALOG permission on its parent catalog.

Create a managed table


CREATE TABLE <catalog-name>.<schema-name>.<table-name>
(
  <column-specification>
);

Create Table (Using)


-- Creates a Delta table
> CREATE TABLE student (id INT, name STRING, age INT);

-- Use data from another table
> CREATE TABLE student_copy AS SELECT * FROM student;

-- Creates a CSV table from an external directory
> CREATE TABLE student USING CSV LOCATION '/path/to/csv_files';

-- Specify table comment and properties
> CREATE TABLE student (id INT, name STRING, age INT)
    COMMENT 'this is a comment'
    TBLPROPERTIES ('foo'='bar');

--Specify table comment and properties with different clauses order
> CREATE TABLE student (id INT, name STRING, age INT)
    TBLPROPERTIES ('foo'='bar')
    COMMENT 'this is a comment';

-- Create partitioned table
> CREATE TABLE student (id INT, name STRING, age INT)
    PARTITIONED BY (age);

-- Create a table with a generated column
> CREATE TABLE rectangles(a INT, b INT,
                          area INT GENERATED ALWAYS AS (a * b));

Create Table Like

Defines a table using the definition and metadata of an existing table or view.


-- Create table using a new location
> CREATE TABLE Student_Dupli LIKE Student LOCATION '/path/to/data_files';

-- Create table like using a data source
> CREATE TABLE Student_Dupli LIKE Student USING CSV LOCATION '/path/to/csv_files';

Create or modify a table using file upload

Create an external table

To create an external table, can use SQL commands or Dataframe write operations.


CREATE TABLE <catalog>.<schema>.<table-name>
(
  <column-specification>
)
LOCATION 'abfss://<bucket-path>/<table-directory>';

Dataframe write operations

Query results or DataFrame write operations

Many users create managed tables from query results or DataFrame write operations. 

%sql

-- Creates a Delta table
> CREATE TABLE student (id INT, name STRING, age INT);

-- Use data from another table
> CREATE TABLE student_copy AS SELECT * FROM student;

-- Creates a CSV table from an external directory
> CREATE TABLE student USING CSV LOCATION '/path/to/csv_files';
> CREATE TABLE DB1.tb_from_csv
    USING CSV
    OPTIONS (
    path '/path/to/csv_files',
    header 'true',
    inferSchema 'true'
);
-- Specify table comment and properties
> CREATE TABLE student (id INT, name STRING, age INT)
    COMMENT 'this is a comment'
    TBLPROPERTIES ('foo'='bar');

-- Specify table comment and properties with different clauses order
> CREATE TABLE student (id INT, name STRING, age INT)
    TBLPROPERTIES ('foo'='bar')
    COMMENT 'this is a comment';

-- Create partitioned table
> CREATE TABLE student (id INT, name STRING, age INT)
    PARTITIONED BY (age);

-- Create a table with a generated column
> CREATE TABLE rectangles(a INT, b INT,
                          area INT GENERATED ALWAYS AS (a * b));

Create Table Like

Defines a table using the definition and metadata of an existing table or view.


-- Create table using a new location
> CREATE TABLE Student_Dupli LIKE Student LOCATION '/path/to/data_files';

-- Create table like using a data source
> CREATE TABLE Student_Dupli LIKE Student USING CSV LOCATION '/path/to/csv_files';

Partition discovery for external tables

To enable partition metadata logging on a table, you must enable a Spark conf for your current SparkSession and then create an external table. 


SET spark.databricks.nonDelta.partitionLog.enabled = true;

CREATE OR REPLACE TABLE <catalog>.<schema>.<table-name>
USING <format>
PARTITIONED BY (<partition-column-list>)
LOCATION 'abfss://<bucket-path>/<table-directory>';

e.g. Create or Replace a partitioned external table with partition discovery
CREATE OR REPLACE TABLE my_table
USING DELTA -- Specify the data format (e.g., DELTA, PARQUET, etc.)
LOCATION 'abfss://<container>@<account>.dfs.core.windows.net/<path>'
PARTITIONED BY (year INT, month INT, day INT);

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Appendix:

MS: What is a table

Unity Catalog: Catalogs and Schemas

A catalog is the primary unit of data organization in the Azure Databricks Unity Catalog data governance model. it is the first layer in Unity Catalog’s three-level namespace (catalog.schema.table-etc). They contain schemas, which in turn can contain tables, views, volumes, models, and functions. Catalogs are registered in a Unity Catalog metastore in your Azure Databricks account.

Catalogs

Organize my data into catalogs

Each catalog should represent a logical unit of data isolation and a logical category of data access, allowing an efficient hierarchy of grants to flow down to schemas and the data objects that they contain. 

Catalogs therefore often mirror organizational units or software development lifecycle scopes. You might choose, for example, to have a catalog for production data and a catalog for development data, or a catalog for non-customer data and one for sensitive customer data.

Data isolation using catalogs

Each catalog typically has its own managed storage location to store managed tables and volumes, providing physical data isolation at the catalog level. 

Catalog-level privileges

grants on any Unity Catalog object are inherited by children of that object, owning a catalog 

Catalog types

  • Standard catalog: the typical catalog, used as the primary unit to organize your data objects in Unity Catalog. 
  • Foreign catalog: a Unity Catalog object that is used only in Lakehouse Federation scenarios.

Default catalog

If your workspace was enabled for Unity Catalog automatically, the pre-provisioned workspace catalog is specified as the default catalog. A workspace admin can change the default catalog as needed.

Workspace-catalog binding

If you use workspaces to isolate user data access, you might want to use workspace-catalog bindings. Workspace-catalog bindings enable you to limit catalog access by workspace boundaries. 

Create catalogs

Requirements: be an Azure Databricks metastore admin or have the CREATE CATALOG privilege on the metastore

To create a catalog, you can use Catalog Explorer, a SQL command, the REST API, the Databricks CLI, or Terraform. When you create a catalog, two schemas (databases) are automatically created: default and information_schema.

Catalog Explorer

  • Log in to a workspace that is linked to the metastore.
  • Click Catalog.
  • Click the Create Catalog button.
  • On the Create a new catalog dialog, enter a Catalog name and select the catalog Type that you want to create:
    Standard catalog: a securable object that organizes data and AI assets that are managed by Unity Catalog. For all use cases except Lakehouse Federation and catalogs created from Delta Sharing shares.
  • Foreign catalog: a securable object that mirrors a database in an external data system using Lakehouse Federation.
  • Shared catalog: a securable object that organizes data and other assets that are shared with you as a Delta Sharing share. Creating a catalog from a share makes those assets available for users in your workspace to read.

SQL

standard catalog

CREATE CATALOG [ IF NOT EXISTS ] <catalog-name>
   [ MANAGED LOCATION '<location-path>' ]
   [ COMMENT <comment> ];

  • <catalog-name>: A name for the catalog.
  • <location-path>: Optional but strongly recommended. 
    e.g. <location-path>: ‘abfss://my-container-name@storage-account-name.dfs.core.windows.net/finance’ or ‘abfss://my-container-name@storage-account-name.dfs.core.windows.net/finance/product’
shared catalog

CREATE CATALOG [IF NOT EXISTS] <catalog-name>
USING SHARE <provider-name>.<share-name>;
[ COMMENT <comment> ];

foreign catalog

CREATE FOREIGN CATALOG [IF NOT EXISTS] <catalog-name> USING CONNECTION <connection-name>
OPTIONS [(database '<database-name>') | (catalog '<external-catalog-name>')];

  • <catalog-name>: Name for the catalog in Azure Databricks.
  • <connection-name>: The connection object that specifies the data source, path, and access credentials.
  • <database-name>: Name of the database you want to mirror as a catalog in Azure Databricks. Not required for MySQL, which uses a two-layer namespace. For Databricks-to-Databricks Lakehouse Federation, use catalog ‘<external-catalog-name>’ instead.
  • <external-catalog-name>: Databricks-to-Databricks only: Name of the catalog in the external Databricks workspace that you are mirroring.

Schemas

Schema is a child of a catalog and can contain tables, views, volumes, models, and functions. Schemas provide more granular categories of data organization than catalogs.

Precondition

  • Have a Unity Catalog metastore linked to the workspace where you perform the schema creation
  • Have the USE CATALOG and CREATE SCHEMA data permissions on the schema’s parent catalog
  • To specify an optional managed storage location for the tables and volumes in the schema, an external location must be defined in Unity Catalog, and you must have the CREATE MANAGED STORAGE privilege on the external location.

Create a schema

To create a schema in Unity Catalog, you can use Catalog Explorer or SQL commands.

To create a schema in Hive metastore, you must use SQL commands.

Catalog Explorer

  • Log in to a workspace that is linked to the Unity Catalog metastore.
  • Click Catalog.
  • In the Catalog pane on the left, click the catalog you want to create the schema in.
  • In the detail pane, click Create schema.
  • Give the schema a name and add any comment that would help users understand the purpose of the schema.
  • (Optional) Specify a managed storage location. Requires the CREATE MANAGED STORAGE privilege on the target external location. See Specify a managed storage location in Unity Catalog and Managed locations for schemas.
  • Click Create.
  • Grant privileges on the schema. See Manage privileges in Unity Catalog.
  • Click Save.

SQL


CREATE { DATABASE | SCHEMA } [ IF NOT EXISTS ] <catalog-name>.<schema-name>
    [ MANAGED LOCATION '<location-path>' | LOCATION '<location-path>']
    [ COMMENT <comment> ]
    [ WITH DBPROPERTIES ( <property-key = property_value [ , ... ]> ) ];

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Appendix:

MS: What are catalogs in Azure Databricks?

MS: What are schemas in Azure Databricks?