Databricks
spark: RDD, Dataframe, Dataset, Transformation and Action
RDD, DataFrame, and Dataset are the core data structures used to handle distributed data.
- RDD (Resilient Distributed Datasets): Low-level abstraction for distributed data
- DataFrame: High-level abstraction for structured data (table)
- Dataset: Combines RDD and DataFrame, adds type safety (Scala/Java)
Transformations and Actions are two fundamental operations used to manipulate distributed data collections
- Transformation: lazy operations that define a new RDD, DataFrame, or Dataset, not immediately execute
- Actions: These are eager operations that trigger execution
A few Important Terminology of Databricks
Go through some key Databricks terms to give you an overview of the different points more…
Add a new user to workspace
To allow another user to use your Databricks workspace, follow these steps more …
☰ Unity Catalog
Unity Catalog is a unified governance solution for all data and AI assets, built on the Lakehouse platform. It provides centralized access control, auditing, lineage, and data discovery capabilities across Azure Databricks workspaces more …
Unity Catalog – Comparison: Unity Catalog, External Data Source, External Table, Mounting Data and Metastore
Comparison including Unity Catalog, Mounting Data, External Data Source, External Table and Metastore. more …
Unity Catalog – Create Metastore and Enabling Unity Catalog in Azure
Metastore register metadata about securable objects (such as tables, volumes, external locations, and shares) and the permissions that govern access to them. more …
Unity Catalog: Storage Credentials and External Locations
A storage credential is an authentication and authorization mechanism for accessing data stored on your cloud tenant.
By creating external locations, you can define a layer of security and manage access to these external data stores from within Databricks. more …
Unity Catalog – Storage Credentials and External Locations
A storage credential is an authentication and authorization mechanism for accessing data stored on your cloud tenant.
By creating external locations, you can define a layer of security and manage access to these external data stores from within Databricks. more …
Unity Catalog – Comparison: the Hive Metastore, Unity Catalog Metastore, and a general Metastore
side-by-side comparison of the Hive Metastore, Unity Catalog Metastore, and a general Metastore. more …
Unity Catalog – Create Catalogs and Schemas
A catalog is the primary unit of data organization in the Azure Databricks Unity Catalog data governance model.
Schema is a child of a catalog and can contain tables, views, volumes, models, and functions
# Create catalog
CREATE CATALOG [ IF NOT EXISTS ] <catalog-name>
[ MANAGED LOCATION '<location-path>' ]
[ COMMENT <comment> ];
# Create schema
CREATE { DATABASE | SCHEMA } [ IF NOT EXISTS ] <catalog-name>.<schema-name>
[ MANAGED LOCATION '<location-path>' | LOCATION '<location-path>']
[ COMMENT <comment> ]
[ WITH DBPROPERTIES ( <property-key = property_value [ , ... ]> ) ];
Unity Catalog – Creating Tables
Managed tables, External tables, Delta tables, Streaming tables
-- Creates a Delta table
> CREATE TABLE student (id INT, name STRING, age INT);
-- Use data from another table
> CREATE TABLE student_copy AS SELECT * FROM student;
-- Creates a CSV table from an external directory
> CREATE TABLE student USING CSV LOCATION '/path/to/csv_files';
-- Create partitioned table
> CREATE TABLE student (id INT, name STRING, age INT)
PARTITIONED BY (age);
--Create an external table
CREATE OR REPLACE TABLE my_table
USING DELTA -- Specify the data format (e.g., DELTA, PARQUET, etc.)
LOCATION 'abfss://<container>@<account>.dfs.core.windows.net/<path>'
PARTITIONED BY (year INT, month INT, day INT);
…. more …
Unity Catalog – Data Access Control with Databricks Unity Catalog
Explains how to control access to data and other objects in Unity Catalog, Principals, Privileges, Securable Objects. more …
Read table from Unity Catalog and write table to Unity Catalog
To read a table from and write a table to Unity Catalog in PySpark, we typically work with tables registered in the catalog rather than directly with file paths. Unity Catalog tables can be accessed using the format catalog_name.schema_name.table_name
.
df = spark.read.table("catalog_name.schema_name.table_name")
df = spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("catalog_name.schema_name.new_table_name")
DBFS: Databricks File System (DBFS)
The Databricks File System (DBFS) is a distributed file system integrated with Databricks that allows users to interact with object storage systems like Azure Blob Storage, Amazon S3, and Google Cloud Storage. more …
DBFS: Access ADLS or Blob using Service Principle with Oauth2
Using Service Principle with Oauth2 to access Storage. more …
ADB: Partition
In Databricks, partitioning is a strategy used to organize and store large datasets into smaller, more manageable chunks based on specific column values.
df.write.format("delta").partitionBy("sex","age").save("/mnt/delta/partitioned_data")
ADB: Comparison Partitioning Strategies and Methods
In distributed computing frameworks are used to distribute and manage data across nodes in a cluster. Some common partitioning techniques include hash partitioning, range partitioning, and others like broadcast joins (known as Round Robin). more …
☰ dbutils
dbutils – Databricks File System, dbutils summary
Databricks File System (DBFS) is a distributed file system mounted into a Databricks workspace and available on Databricks clusters. more …
dbutils – Secrets and Secret Scopes
A secret scope is a boundary within which secrets are stored. We can create secret scopes to securely manage access to sensitive data and credentials.
#Lists secret scopes
dbutils.secrets.listScopes()
#Lists all the secrets
dbutils.secrets.list("scope-name")
# Retrieves the secret
my_secret = dbutils.secrets.get(scope="my_scope", key="my_secret_key")
dbutils – Using Account Key or SAS to access adls or blob by mount
using dbutils.fs.mount(), with either an account key or a SAS token for authentication to mount ADLS or Blob.
dbutils.fs.mount(
Source=”wasbs://<contain-name>@<storage-account-name>.blob.core.windows.net”,
Mount_point = “/mnt/<mount-name>”,
Extra_configs = {“<conf-key>”:” account-key”}
)
dbutils – widget
dbutils.widgets
provide a way to create interactive controls like dropdowns, text inputs, and multi-selects.
# create widget
dbutils.widgets.text(“input_text”, “default_value”, “Text Input”)
CREATE WIDGET TEXT tableName DEFAULT ‘customers’
# Retrieving Widget Values
value = dbutils.widgets.get(“widget_name”)
SELECT * FROM ${getArgument(‘tableName’)}
dbutils – notebook run(), exit() and pass parameters
dbutils.notebook allows you to run other notebooks, pass parameters, capture return values, and manage notebook execution flows.
# run another notebook
dbutils.notebook.run("/path/to/other_notebook", 60, {"param1": "value1", "param2": "value2"})
# Exit a Notebook with value
dbutils.notebook.exit("Success")
# Return Value from a Notebook
result = dbutils.notebook.run("/path/to/notebook", 60, {"param": "value"})
print(result)
ADB — Comparison All-Purpose Cluster, Job Cluster, SQL Warehouse and Instance Pools
Clusters and SQL Warehouses (formerly SQL Endpoints) serve as core compute resources for executing workloads. Along with Instance Pools, they provide flexibility for managing and optimizing resources.
- All-Purpose Cluster: General-purpose compute
- Job Cluster: Dedicated to run a specific job or task.
- SQL Warehouse: Optimized for running SQL queries
- Instance Pools: pre-allocate virtual machines (VMs) to reduce cluster startup times and optimize costs.
☰ Delta : delta table, time travel, schema evolution
delta – Delta Table, Delta Lake
Delta table is a type of table that builds on the Delta Lake storage layer and brings ACID (Atomicity, Consistency, Isolation, Durability) transactions, schema enforcement, and scalable metadata management to traditional data lakes. more …
delta – Time Travel of Delta Table
Time Travel in Delta Lake allows you to query, restore, vacuum or audit the historical versions of a Delta table.
# sql
DESCRIBE HISTORY my_delta_table;
SELECT * FROM my_delta_table VERSION AS OF 5;
#Python
spark.sql("SELECT * FROM my_delta_table VERSION AS OF 5")
spark.sql("RESTORE TABLE my_delta_table TO VERSION AS OF 5")
spark.sql("RESTORE TABLE my_delta_table TO TIMESTAMP AS OF '2024-10-07T12:30:00'")
spark.sql("VACUUM my_delta_table")
delta – Schema Evolution
Schema Evolution in Databricks refers to the ability to automatically adapt and manage changes in the structure (schema) of a Delta Lake table over time.
# Enable auto schema merging for the session
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Overwrite the existing Delta table schema with new data
df_new_data.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/path/to/delta-table")
# Append new data to the Delta table with automatic schema merging
df_new_data.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta-table")
Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS
Access a delta table saved in blob or adls using the file path rather than a cataloged name (like in Unity Catalog).
# Spark SQL
SELECT * FROM delta.`/mnt/path/to/delta/table`
INSERT INTO delta.`/mnt/path/to/delta/table`
caution: " ` " - backticks
# pyspark
df = spark.read.format("delta").load("path/to/delta/table")
df.write.format("delta").mode("overwrite").save("path/to/delta/table")
☰ PySpark
PySpark has been released in order to support the collaboration of Apache Spark and Python, it actually is a Python API for Spark. In addition, PySpark, helps you interface with Resilient Distributed Datasets (RDDs) in Apache Spark and Python programming language.
CSV – read and write
Read in dataframe and Write a dataframe to a csv file.
# reading from csv
spark.read.format(“csv”).options(options).load(file_location).schema(schema_df)
# writing to csv
df.write.format("csv").options(options).save("output/path/")
JSON – read, write and flattening nested json
Read a json file into dataframe and write a dataframe to a json file, flattening complex nested json by using PySpark’s select and explode functions to flatten the structure.
# reading from json
df = spark.read.option(options).schema(schame).json(“/path/file”)
#writing to json
df.write.option(options). mode(“overwrite”).json(“/path/output”)
Parquet – read and write
read a parquet file in dataframe and write a dataframe to a parquet file
# reading from Parquet file
df = spark.read.format("parquet").option("mergeSchema", "true") \
.load("/path/to/parquet/file/or/directory")
# Writing to Parquet file
df.write.format("parquet").mode("overwrite")
.option("compression", "snappy") \
.option("path", "/path/to/output/directory") \
.partitionBy("year", "month")
.save()
database: read/write database using JDBC
Read/Write Data from SQL Database using JDBC
# reading from
df = spark.read.jdbc(url=jdbc_url, table="tb",properties=con_properties)
# writing to
df.write.jdbc(url=jdbc_url, table="", mode="overwrite", properties=connection_properties)
☰ select (), withCloumn()
select ()
from pyspark.sql import functions as F
df.select(df[“column1”].alias(“new_column1”), df[“column2”])
df.select(F.col(“column1”), F.lit(“constant_value”), (F.col(“column2”) + 10).alias(“modified_column2”)).show()
df.select((df[“column1”] * 2).alias(“double_column1”), F.round(df[“column2”], 2).alias(“rounded_column2”))
df.select(“struct_column.field_name”).show()
df.selectExpr(“column1”, “column2 + 10 as new_column2”)
more …
withColumn
df.withColumn(“column_name”, expression)
df_new = df.withColumn(“New_Column”, lit(100))
df_arithmetic = df.withColumn(“New_ID”, col(“ID”) * 2 + 5)
df_concat = df.withColumn(“Full_Description”, concat(col(“Name”), lit(” has ID “), col(“ID”)))
df_conditional = df.withColumn(“Is_Adult”, when(col(“ID”) > 18, “Yes”).otherwise(“No”))
df_uppercase = df.withColumn(“Uppercase_Name”, upper(col(“Name”)))
more …
☰ arrayType functions
arrayType functions: more …
explode (): df.withColumn(“Cols”, explode(df.arrayCol)) more …
Split (): df.withColumn(“Name_Split”, split(df[“Name”], “,”)) more …
array (): df1.withColumn(“numbers”,array(col(“num1”),col(“num2”)))) more …
array_contains (): more …
getItem (): df.withColumn(“item_1_value”, df.arrayCol.getItem(1)) more …
size (): df.select(size(df.numbers)).show() more …
sort_array (): df.select(sort_array(“numbers”, asc=False).alias(“sorted_desc”)) more …
concat (): more …
array_zip (): more …
☰ mapType functions
map_keys (): df.select(map_keys(col(“col_name”)).alias(“key_name”)) more …
map_values(): df.select(map_values(col(“col_name”)).alias(“value_name”)) more …
explode (): df.select(“product_id”, explode(“prices”).alias(“currency”, “price”)) more …
getItem (): df.select( col(“product_id”), col(“prices”).getItem(“USD”).alias(“price_in_usd”)) more …
filtering: df.filter(col(“prices”).getItem(“USD”) > 150) more …
map_concat (): df.select(map_concat(col(“prices”), additional_currency)) more …
df.withColumnRenamed(existingName, newName) more …
df.show(n=x, truncate=True, vertical=False) more …
withColumnRenamed(), drop(), show()
df.withColumnRenamed(oldName, newName)
df_dropped_multiple = df.drop(“id”, “age”)
df.show(n=10, truncate= True, vertical=True)
arrayType, mapType column and functions
In PySpark, ArrayType
and MapType
are used to define complex data structures within a DataFrame schema.
Comparison: split (), concat (), array_zip (), and explode ()
Comparison: split (), concat (), array_zip (), and explode ()
split: Splits a string column into an array of substrings
concat: Concatenates multiple arrays or strings into a single array or string
array_zip: Zips multiple arrays element-wise into a single array of structs.
explode: Flattens an array into multiple rows, with one row per element in the array.
more …
from_json (), to_json ()
from pyspark.sql.functions import from_json, col
from_json(column, schema, options={})
from_json(col(“json_string”), [“name”,”age”])
from pyspark.sql.functions import to_json, col
to_json(column, options={})
to_json(col(“name”), {“pretty”: “true”}))
StructType(), StructField()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField(“name”, StringType(), True),
StructField(“age”, IntegerType(), True)
])
more …
☰ Condition: “if-then-else-” logic, when (), otherwise (), expr (), selectExpr (), coalesces (), isin (), between (), isNull (), isNotNull ()
conditional expressions, more…
if-then-else-” logic, more…
when (), otherwise () more…
expr (), selectExpr () more…
comparison: expr () and selectExpr () more…
coalesces () more…
isin (), between () more…
isNull (), isNotNull () more…
☰ join(), union(), unionByName(), fillna(), select()
join():
df1.join(df2, on=col(“ID”), how=”left”)
default: “inner”. semi and anti join are new comer, SQL does have these.
more …
union():
df1.union(df2)
df1.union(df2).union(df3).union(df4)
Column Names, Position and Data Types Must Match.
more …
unionByName:
df1.unionByName(df2, allowMissingColumns=Ture)
the schemas and order can be different in df1 and df2
more …
select():
df.select(“id”, “name”)
df.select(df.id, df.name)
df.select(df[“id”], df[“name”])
df.select(“name”, col(“age”) + 5)
more …
fillna():
df.fillna({“age”: 0, “dept”: “Unknown”})
df.fillna(0, subset=[“age”, “salary”])
more …
☰ distinct(), dropDuplicates(), orderBy(), sort(), groupBy(), agg()
distinct():
distinct_df = df.distinct()
more …
dropDuplicates ():
DataFrame.dropDuplicates([col1, col2, …, coln])
df_unique_names = df.dropDuplicates([“name”])
more …
orderBy(), sort():
DataFrame.orderBy(*cols, **kwargs)
DataFrame.sort(*cols, **kwargs)
more …
groupBy:
DataFrame.groupBy(*cols)
df_grouped_multi = df.groupBy(“department”, “gender”).count()
more …
agg():
DataFrame.agg(*exprs)
df_grouped_agg = df.groupBy(“department”).agg(
sum(“salary”).alias(“total_salary”),
avg(“salary”).alias(“avg_salary”),
count(“name”).alias(“num_employees”)
)
more …
☰ alias(), asc(), desc(), cast(), filter(), where(), like()
alias():
df.alias(“df1”);
df.select(df[“id”].alias(“new_ID”))
more …
asc (), desc ():
df.orderBy(asc(“age”)).show()
df.orderBy(desc(“age”)).show()
more …
cast ():
df[“column_name”].cast(“new_data_type”)
more …
filter (), where():
df.filter(condition) or df.where(condition) &
(AND) |
(OR) ~ (NOT)
df.filter((df[“age”] > 30) & (df[“salary”] > 50000))
df.where((df[“age”] > 30) & (df[“salary”] > 50000))
more …
like ():
df.filter(df[“column_name”].like(“pattern”))%
: Represents any sequence of characters._
: Represents a single character.
more …
☰ contain(), collect(), transform(), udf(), udf for SQL
contain ():
Column.contains(substring: str)
df.filter(df[“Department”].contains(“Sales”))
more …
collect ():
DataFrame.collect()
no parameter at all.
more …
transform ()
df1 = df.transform(my_function)
function name only.
more …
udf ()
from pyspark.sql.functions import udf
from pyspark.sql.types import DataType
my_udf = udf(py_function, returnType)
new_df= df.withColumn(“col_Name”, my_udff(df[“Name”]))
more …
udf () register for SQL Query
spark.udf.register(“registered_pyFun_for_sql”, original_pyFun, returnType)
spark.sql(“SELECT Name, registered_pyFun_for_sql(Name) AS alisa_name FROM tb_or_vw”)
more …
A complex python function declares, udf register, and apply example
spark.udf.register(“registered_pyFun_for_sql”, original_pyFun, returnType)
spark.sql(“SELECT Name, registered_pyFun_for_sql(Name) AS alisa_name FROM tb_or_vw”)
more …
Comparison of transform() and udf() in PySpark
udf () allows you to define and apply a user-defined function to a column or multiple columns.
transform () is a higher-order function available in PySpark that applies a custom function element-wise to each element in an array column.