Locking Mechanisms in Relational Database Management Systems (RDBMS)

In relational databases, locks are essential mechanisms for managing concurrent access to data. They prevent data corruption and ensure data consistency when multiple transactions try to read or modify the same data simultaneously.

Without locks, concurrent transactions could lead to several problems. For example,

  • Dirty Reads, a transaction may read data that has been modified by another transaction but not yet committed;
  • Lost updates, one transaction’s updates may be overwritten by another transaction;
  • Non-Repeatable Reads, A transaction reads the same data multiple times, and due to updates by other transactions, the results of each read may be different;
  • Phantom Reads: A transaction executes the same query multiple times, and due to insertions or deletions by other transactions, the result set of each query may be different.

Here’s a detailed breakdown of locks in relational databases.

Types of Locks

Relational databases use various types of locks with different levels of restriction:

Shared Lock

Allows multiple read operations simultaneously. Prevents write operations until the lock is released.

Example: SELECT statements in many databases.

Exclusive Lock

Allows a single transaction to modify data. Prevents other operations (read or write) until the lock is released.

Example: UPDATE, DELETE.

Update Lock

Prevents deadlocks when a transaction might upgrade a shared lock to an exclusive lock.

Intent Lock

Indicate the type of lock a transaction intends to acquire. Intent Shared (IS): Intends to acquire a shared lock on a lower granularity level. Intent Exclusive (IX): Intends to acquire an exclusive lock on a lower granularity level.

Lock Granularity

Locks can be applied at different levels of granularity.

Row-Level Lock

Locks a specific row in a table. Provide the highest concurrency, but if many rows are locked, it may lead to lock management overhead.
Example: Updating a specific record (UPDATE ... WHERE id = 1).

Page-Level Lock

Locks a data page, a block of rows. Provide a compromise between concurrency and overhead.

(a page is a fixed-size storage unit)

Table-Level Lock

Locks an entire table. Provide the lowest concurrency but minimal overhead.

Example: Prevents any modifications to the table during an operation like ALTER TABLE.

Lock Duration

Transaction Locks: Held until the transaction is committed or rolled back.

Session Locks: Held for the duration of a session.

Temporary Locks: Released immediately after the operation completes.

Deadlocks Prevention and Handling

A deadlock occurs when two or more transactions are waiting for each other to release locks. Databases employ deadlock detection and resolution mechanisms to handle such situations.

Prevent Deadlocks

Avoid Mutual Exclusion
Use resources that allow shared access (e.g., shared locks for read-only operations).

 Eliminate Hold and Wait
Require transactions to request all resources they need at the beginning. If any resource is unavailable, the transaction must wait without holding any resources.

Allow Preemption
If a transaction requests a resource that is held by another, the system can preempt (forcefully release) the resource from the holding transaction. The preempted transaction is rolled back and restarted.

Break Circular Wait
Impose a global ordering on resources and require transactions to request resources in that order. For example, if resources are ordered as R1, R2, R3, a transaction must request R1 before R2, and R2 before R3.

    Handle Deadlocks

    If deadlocks cannot be prevented, the database system must detect and resolve them. Here’s how deadlocks are typically handled:

    Deadlock Detection
    The database system periodically checks for deadlocks by analyzing the wait-for graph, which represents transactions and their dependencies on resources. If a cycle is detected in the graph, a deadlock exists.

    Deadlock Resolution
    Once a deadlock is detected, the system must resolve it by choosing a victim transaction to abort. The victim is typically selected based on criteria such as:

    • Transaction Age: Abort the newest or oldest transaction.
    • Transaction Progress: Abort the transaction that has done the least work.
    • Priority: Abort the transaction with the lowest priority.

    The aborted transaction is rolled back, releasing its locks and allowing other transactions to proceed.

    Conclusion

    Locks are crucial for ensuring data consistency and integrity in relational databases. Understanding the different types of locks, lock granularity, locking protocols, and isolation levels is essential for database developers and administrators to design and manage concurrent applications effectively.

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Using Exists Transformation for Data Comparison in Azure Data Factory/Synapse

    In this article, I will discuss on the Exists Transformation of Data Flow. The exists transformation is a row filtering transformation that checks whether your data exists in another source or stream. The output stream includes all rows in the left stream that either exist or don’t exist in the right stream. The exists transformation is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

    I use the Exists transformation in Azure Data Factory or Synapse data flows to compare source and target data.” (This is the most straightforward and generally preferred option.

    Create a Data Flow

    Create a Source

    Create a DerivedColumn Transformation

    expression uses : sha2(256, columns())

    Create target and derivedColumn transformation

    The same way of source creates target. To keep the data type are the same so that we can use hash value to compare, I add a “Cast transformation”;

    then the same as source setting, add a derivedColumn transformation.

    Exists Transformation to compare Source and target

    add a Exists to comparing source and target.

    The Exists function offers two options: Exists and Doesn’t Exist. It supports multiple criteria and custom expressions.

    Configuration

    1. Choose which data stream you’re checking for existence in the Right stream dropdown.
    2. Specify whether you’re looking for the data to exist or not exist in the Exist type setting.
    3. Select whether or not your want a Custom expression.
    4. Choose which key columns you want to compare as your exists conditions. By default, data flow looks for equality between one column in each stream. To compare via a computed value, hover over the column dropdown and select Computed column.

    “Exists” option

    Now, let use “Exists” option

    we got this depid = 1004 exists.

    Doesn’t Exist

    use “Doesn’t Exist” option

    we got depid = 1003. wholessale exists in Source side, but does NOT exist in target.

    Recap

    The “Exists Transformation” is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

    It is very convenient to compare in data engineering project, e.g. ETL comparison.

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Comparison of Azure SQL Managed Instance, Azure SQL Database, Azure SQL Server

    Azure offers several SQL-related services, each tailored to different use cases and requirements. Below is a comparison of Azure SQL Managed InstanceAzure SQL Database, and Azure SQL Server (often referred to as a logical SQL Server in Azure).

    Azure SQL Database

    1. Azure SQL Database

    • Description: A fully managed, platform-as-a-service (PaaS) relational database offering. It is designed for modern cloud applications and supports single databases and elastic pools.
    • Use Cases:
      • Modern cloud-native applications.
      • Microservices architectures.
      • Applications requiring automatic scaling, high availability, and minimal management overhead.
    • Key Features:
      • Single database or elastic pools (shared resources for multiple databases).
      • Automatic backups, patching, and scaling.
      • Built-in high availability (99.99% SLA).
      • Serverless compute tier for cost optimization.
      • Limited SQL Server surface area (fewer features compared to Managed Instance).
    • Limitations:
      • No support for SQL Server Agent, Database Mail, or cross-database queries.
      • Limited compatibility with on-premises SQL Server features.
    • Management: Fully managed by Microsoft; users only manage the database and its resources.

    Azure SQL Managed Instance

    • Description: A fully managed instance of SQL Server in Azure, offering near 100% compatibility with on-premises SQL Server. It is part of the PaaS offering but provides more control and features compared to Azure SQL Database.
    • Use Cases:
      • Lift-and-shift migrations of on-premises SQL Server workloads.
      • Applications requiring full SQL Server compatibility.
      • Scenarios needing features like SQL Server Agent, cross-database queries, or linked servers.
    • Key Features:
      • Near 100% compatibility with SQL Server.
      • Supports SQL Server Agent, Database Mail, and cross-database queries.
      • Built-in high availability (99.99% SLA).
      • Virtual network (VNet) integration for secure connectivity.
      • Automated backups and patching.
    • Limitations:
      • Higher cost compared to Azure SQL Database.
      • Slightly longer deployment times.
      • Limited to a subset of SQL Server features (e.g., no Windows Authentication).
    • Management: Fully managed by Microsoft, but users have more control over instance-level configurations.

    Azure SQL Server

    Description: A logical server in Azure that acts as a central administrative point for Azure SQL Database and Azure SQL Managed Instance. It is not a standalone database service but rather a management layer.

    Use Cases:

    • Managing multiple Azure SQL Databases or Managed Instances.
    • Centralized authentication and firewall rules.
    • Administrative tasks like setting up logins and managing access.

    Key Features:

    • Acts as a gateway for Azure SQL Database and Managed Instance.
    • Supports Azure Active Directory (AAD) and SQL authentication.
    • Configurable firewall rules for network security.
    • Provides a connection endpoint for databases.

    Limitations:

    • Not a database service itself; it is a management tool.
    • Does not host databases directly.

    Management: Users manage the server configuration, logins, and firewall rules.

    Side by side Comparison 

    Feature/AspectAzure SQL DatabaseAzure SQL Managed InstanceAzure SQL Server (Logical)
    Service TypeFully managed PaaSFully managed PaaSManagement layer
    CompatibilityLimited SQL Server featuresNear 100% SQL Server compatibilityN/A (management tool)
    Use CaseCloud-native appsLift-and-shift migrationsCentralized management
    High Availability99.99% SLA99.99% SLAN/A
    VNet IntegrationLimited (via Private Link)SupportedN/A
    SQL Server AgentNot supportedSupportedN/A
    Cross-Database QueriesNot supportedSupportedN/A
    CostLowerHigherFree (included in service)
    Management OverheadMinimalModerateMinimal

    SQL Server’s Side-by-Side Feature: Not Available in Azure SQL

    Following are list that shows SQL Server have but not available in Azure SQL Database and Azure SQL Managed Instance.

    1. Instance-Level Features

    FeatureSQL ServerAzure SQL DatabaseAzure SQL Managed Instance
    Multiple Databases Per Instance✅ Full support❌ Only single database per instance✅ Full support
    Cross-Database Queries✅ Full support❌ Limited with Elastic Query✅ Full support
    SQL Server Agent✅ Full support❌ Not available✅ Supported (with limitations)
    PolyBase✅ Full support❌ Not available❌ Not available
    CLR Integration (SQL CLR)✅ Full support❌ Not available✅ Supported (with limitations)
    FileStream/FileTable✅ Full support❌ Not available❌ Not available

    2. Security Features

    FeatureSQL ServerAzure SQL DatabaseAzure SQL Managed Instance
    Database Mail✅ Full support❌ Not available❌ Not available
    Service Broker✅ Full support❌ Not available❌ Not available
    Custom Certificates for Transparent Data Encryption (TDE)✅ Full support❌ Limited to Azure-managed keys❌ Limited customization

    3. Integration Services

    FeatureSQL ServerAzure SQL DatabaseAzure SQL Managed Instance
    SSIS Integration✅ Full support❌ Requires external tools❌ Requires external tools
    SSRS Integration✅ Full support❌ Not available❌ Not available
    SSAS Integration✅ Full support❌ Not available❌ Not available

    4. Specialized Features

    FeatureSQL ServerAzure SQL DatabaseAzure SQL Managed Instance
    Machine Learning Services (R/Python)✅ Full support❌ Not available❌ Not available
    Data Quality Services (DQS)✅ Full support❌ Not available❌ Not available

    Conclusion

    • Azure SQL Database: Ideal for new cloud-native applications or applications that don’t require full SQL Server compatibility.
    • Azure SQL Managed Instance: Best for migrating on-premises SQL Server workloads to the cloud with minimal changes.
    • Azure SQL Server (Logical): Used for managing and administering Azure SQL Databases and Managed Instances.

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Summary of Commonly used T-SQL queries

    SQL Execution Order:

    1. FROM – Specifies the tables involved in the query.
    2. JOIN – Joins multiple tables based on conditions.
    3. WHERE – Filters records before aggregation.
    4. GROUP BY – Groups records based on specified columns.
    5. HAVING – Filters aggregated results.
    6. SELECT – Specifies the columns to return.
    7. DISTINCT – Removes duplicate rows.
    8. ORDER BY – Sorts the final result set.
    9. LIMIT / OFFSET – Limits the number of rows returned.

    TSQL Commands Categorized 

    Categorized 
    • DDL – Data Definition Language
      CREATE, DROP, ALTER, TRUNCATE, COMMENT, RENAME
    • DQL – Data Query Language
      SELECT
    • DML – Data Manipulation Language
      INSERT, UPDATE, DELETE, LOCK
    • DCL – Data Control Language
      GRANT, REVOKE
    • TCL – Transaction Control Language
      BEGIN TRANSACTION, COMMIT, SAVEPOINT, ROLLBACK, SET TRANSACTION, SET CONSTRAINT
    DDL command
    ALTER
    Alter Table
    Add Column
    ALTER TABLE table_name
    ADD column_name data_type [constraints];
    ALTER TABLE Employees
    ADD Age INT NOT NULL;
    
    
    Drop Column
    ALTER TABLE table_name
    DROP COLUMN column_name;
    ALTER TABLE Employees
    DROP COLUMN Age;
    
    
    Modify Column (Change Data Type or Nullability)
    ALTER TABLE table_name
    ALTER COLUMN column_name new_data_type [NULL | NOT NULL];
    ALTER TABLE Employees
    ALTER COLUMN Age BIGINT NULL;
    
    
    Add Constraint:
    ALTER TABLE table_name ADD CONSTRAINT constraint_name constraint_type (column_name);
    ALTER TABLE Employees
    ADD CONSTRAINT PK_Employees PRIMARY KEY (EmployeeID);
    
    
    Drop Constraint:
    ALTER TABLE table_name DROP CONSTRAINT constraint_name;
    ALTER TABLE Employees DROP CONSTRAINT PK_Employees;
    
    Alter Database
    Change Database Settings
    ALTER DATABASE database_name
    SET option_name;
    ALTER DATABASE TestDB
    SET READ_ONLY;
    
    
    Change Collation
    ALTER DATABASE database_name
    COLLATE collation_name;
    ALTER DATABASE TestDB
    COLLATE SQL_Latin1_General_CP1_CI_AS;
    
    
    ALTER VIEW
    Modify an Existing View
    ALTER VIEW view_name
    AS
    SELECT columns
    FROM table_name
    WHERE condition;
    ALTER VIEW EmployeeView
    AS
    SELECT EmployeeID, Name, Department
    FROM Employees
    WHERE IsActive = 1;
    
    
    ALTER PROCEDURE
    Modify an Existing Stored Procedure
    ALTER PROCEDURE procedure_name
    AS
    BEGIN
        -- Procedure logic
    END;
    ALTER PROCEDURE GetEmployeeDetails
    @EmployeeID INT
    AS
    BEGIN
        SELECT * FROM Employees WHERE EmployeeID = @EmployeeID;
    END;
    
    
    ALTER FUNCTION
    ALTER FUNCTION function_name
    RETURNS data_type
    AS
    BEGIN
        -- Function logic
        RETURN value;
    END;
    ALTER FUNCTION GetFullName
    (@FirstName NVARCHAR(50), @LastName NVARCHAR(50))
    RETURNS NVARCHAR(100)
    AS
    BEGIN
        RETURN @FirstName + ' ' + @LastName;
    END;
    
    ALTER INDEX
    Rebuild Index:
    ALTER INDEX index_name
    ON table_name
    REBUILD;
    
    
    Reorganize Index:
    ALTER INDEX index_name
    ON table_name
    REORGANIZE;
    
    
    Disable Index:
    ALTER INDEX index_name
    ON table_name
    DISABLE;
    
    ALTER SCHEMA
    Move Object to a Different Schema
    ALTER SCHEMA new_schema_name
    TRANSFER current_schema_name.object_name;
    ALTER SCHEMA Sales
    TRANSFER dbo.Customers;
    
    
    ALTER ROLE
    Add Member:
    ALTER ROLE role_name
    ADD MEMBER user_name;
    ALTER ROLE db_datareader
    ADD MEMBER User1;
    
    Drop Member:
    ALTER ROLE role_name
    DROP MEMBER user_name;
    CREATE
    Create Table
    # Create Parent Table
    CREATE TABLE Departments (
        DeptID INT IDENTITY(1, 1) PRIMARY KEY, -- IDENTITY column as the primary key
        DeptName NVARCHAR(100) NOT NULL
    );
    
    # Create child table with FK
    CREATE TABLE Employees (
        EmpID INT IDENTITY(1, 1) PRIMARY KEY,  -- IDENTITY column as the primary key
        EmpName NVARCHAR(100) NOT NULL,
        Position NVARCHAR(50),
        DeptID INT NOT NULL,                   -- Foreign key column
        CONSTRAINT FK_Employees_Departments FOREIGN KEY (DeptID) REFERENCES Departments(DeptID)
    );

    DROP
    Drop Database

    Make sure no active connections are using the database.
    To forcibly close connections, use:

    ALTER DATABASE TestDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE; DROP DATABASE TestDB;
    DROP DATABASE DatabaseName;
    DROP DATABASE TestDB;
    Drop Schema

    The DROP SCHEMA statement removes a schema, but it can only be dropped if no objects exist within it.

    Before dropping a schema, make sure to drop or move all objects inside it

    DROP TABLE SalesSchema.SalesTable;
    DROP SCHEMA SalesSchema;
    
    DROP SCHEMA SalesSchema;
    Drop Table

    Dropping a table will also remove constraints, indexes, and triggers associated with the table.

    DROP TABLE TableName;
    DROP TABLE Employees;
    Drop Column

    The DROP COLUMN statement removes a column from a table.

    ALTER TABLE TableName DROP COLUMN ColumnName;
    ALTER TABLE Employees DROP COLUMN Position;

    You cannot drop a column that is part of a PRIMARY KEY, FOREIGN KEY, or any other constraint unless you first drop the constraint.

    Additional DROP Statements

    Drop Index

    Remove an index from a table.
    DROP INDEX IndexName ON TableName;

    Drop Constraint

    Remove a specific constraint (e.g., PRIMARY KEY, FOREIGN KEY).
    ALTER TABLE TableName DROP CONSTRAINT ConstraintName;

    Order of Dependencies

    When dropping related objects, always drop dependent objects first:

    1. Constraints (if applicable)
    2. Columns (if applicable)
    3. Tables
    4. Schemas
    5. Database

    Example: Full Cleanup

    -- Drop a column
    ALTER TABLE Employees DROP COLUMN TempColumn;
    
    -- Drop a table
    DROP TABLE Employees;
    
    -- Drop a schema
    DROP SCHEMA HR;
    
    -- Drop a database
    ALTER DATABASE CompanyDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE CompanyDB;
    DQL command
    CTE

    Common Table Expression (CTE) in SQL is a temporary result set that you can reference within a SELECTINSERTUPDATE, or DELETE statement. CTEs are particularly useful for simplifying complex queries, improving readability, and breaking down queries into manageable parts. 

    WITH cte_name (optional_column_list) AS (
        -- Your query here
        SELECT column1, column2
        FROM table_name
        WHERE condition
    )
    -- Main query using the CTE
    SELECT *
    FROM cte_name;

    Breaking Down Multi-Step Logic

    WITH Step1 AS (
        SELECT ProductID, SUM(Sales) AS TotalSales
        FROM Sales
        GROUP BY ProductID
    ),
    Step2 AS (
        SELECT ProductID, TotalSales
        FROM Step1
        WHERE TotalSales > 1000
    )
    SELECT *
    FROM Step2;

    Intermediate Calculations

    WITH AverageSales AS (
        SELECT ProductID, AVG(Sales) AS AvgSales
        FROM Sales
        GROUP BY ProductID
    )
    SELECT ProductID, AvgSales
    FROM AverageSales
    WHERE AvgSales > 500;

    Recursive CTE

    WITH RECURSIVE EmployeeHierarchy AS (
        SELECT EmployeeID, ManagerID, EmployeeName
        FROM Employees
        WHERE ManagerID IS NULL -- Starting point (CEO)
    
        UNION ALL
    
        SELECT e.EmployeeID, e.ManagerID, e.EmployeeName
        FROM Employees e
        JOIN EmployeeHierarchy eh ON e.ManagerID = eh.EmployeeID
    )
    SELECT *
    FROM EmployeeHierarchy;
    

    Why Use CTEs?

    1. Improve Readability:
      • CTEs allow you to break down complex queries into smaller, more understandable parts.
      • They make the query logic clearer by separating it into named, logical blocks.
    2. Reusability:
      • You can reference a CTE multiple times within the same query, avoiding the need to repeat subqueries.
    3. Recursive Queries:
      • CTEs support recursion, which is useful for hierarchical or tree-structured data (e.g., organizational charts, folder structures).
    4. Simplify Debugging:
      • Since CTEs are modular, you can test and debug individual parts of the query independently.
    5. Alternative to Subqueries:
      • CTEs are often easier to read and maintain compared to nested subqueries.

    Conditional Statements

    IF…ELSE …
    IF EXISTS (SELECT 1 FROM table_name WHERE column1 = 'value')
       BEGIN
           PRINT 'Record exists';
       END
    ELSE
       BEGIN
          PRINT 'Record does not exist';
       END;
    
    IF EXISTS …. or IF NOT EXISTS …
    -- Check if rows exist in the table
    IF EXISTS (SELECT * FROM Tb)
    BEGIN
        -- Code block to execute if rows exist
        PRINT 'Rows exist in the table';
    END;
    
    -- Check if rows do not exist in the table
    IF NOT EXISTS (SELECT * FROM Tb)
    BEGIN
        -- Code block to execute if no rows exist
        PRINT 'No rows exist in the table';
    END;
    
    CASE
    SELECT column1,
           CASE 
               WHEN column2 = 'value1' THEN 'Result1'
               WHEN column2 = 'value2' THEN 'Result2'
               ELSE 'Other'
           END AS result
    FROM table_name;
    

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Summary of SQL built-in functions

    BETWEEN value1 and value2

    The BETWEEN function in SQL is used to filter the result set within a specified range. It can be used with numeric, date, or textual values. Here’s a basic example:

    SELECT *
    FROM employees
    WHERE age BETWEEN 25 AND 35;
    
    STRING_AGG ( column_name, ”, ‘ )

    The STRING_AGG() function in T-SQL (Transact-SQL) is used to concatenate values from multiple rows into a single string, separated by a specified delimiter. It’s particularly useful for combining text from multiple rows into a single row result.

    SELECT STRING_AGG(column_name, ', ') AS concatenated_column
    FROM table_name;
    • column_name is the name of the column containing the values you want to concatenate.
    • , is the delimiter that separates the values in the resulting string.
    select id, AppDate,Company from cv where id between 270 AND 280
    id	AppDate	Company
    270	2021-04-24	dentalcorp  
    272	2021-04-24	EMHware
    274	2021-04-24	Altus Group  
    276	2021-04-24	Dawn InfoTek Inc.
    278	2021-04-25	Capco
    280	2021-04-25	OPTrust  
    
    select string_agg([Company],',') from cv where id between 270 AND 280 
    concated
    dentalcorp,EMHware,Altus Group,Dawn InfoTek Inc.,Capco,OPTrust  
    
    Concatenating Text from Multiple Rows

    For instance, suppose you have a table of employees with a column for their skills, and you want to get a list of all skills each employee has:

    SELECT employee_id, STRING_AGG(skill, ', ') AS skills
    FROM employee_skills
    GROUP BY employee_id;
    
    Generating a Comma-Separated List of Values

    If you have a table of orders and you want to list all products in each order:

    SELECT order_id, STRING_AGG(product_name, ', ') AS products
    FROM order_details
    GROUP BY order_id;
    
    Creating a Summary Report

    You can use STRING_AGG() to generate a summary report, for example, listing all customers in each country:

    SELECT country, STRING_AGG(customer_name, '; ') AS customers FROM customers GROUP BY country;
    Generating Dynamic SQL Queries
    DECLARE @sql NVARCHAR(MAX); SELECT @sql = STRING_AGG('SELECT * FROM ' + table_name, ' UNION ALL ') FROM tables_to_query; EXEC sp_executesql @sql

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    PySpark Data sources

    PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive

    Query Database Table
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:mysql://{server_name}:{port}/{database_name}") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", "employee") \
        .option("user", "root") \
        .option("password", "root") \
        .load()
    # Query table using jdbc()
    # Parameters
    server_name = "myserver.database.windows.net"
    port = "1433"
    database_name = "mydatabase"
    table_name = "mytable"
    username = "myusername"
    password = "mypassword"
    
    # Construct the JDBC URL
    jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
    connection_properties = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    # Read data from the SQL database
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    # show DataFrame
    df_dep.show()
    +-----+--------------+
    |depid|           dep|
    +-----+--------------+
    |    1|            IT|
    |    2|          Sale|
    |    3|       Finance|
    |    4|human resource|
    +-----+--------------+

    Query JDBC Table Parallel, specific Columns

    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("query", "select id,age from emp where sex='M'") \
        .option("numPartitions",5) \
        .option("fetchsize", 20) \
        .option("user", "root") \
        .option("password", "root") \
        .load()
    
    df.show()
    Read & Write CSV File

    Read CSV File

    df = spark.read.format("csv")\
    .options(delimiter=',') \
    .options(inferSchema='True')\
    .options(header='True')\
    .load("/Folder/, /filePath2/")

    Write CSV File

    df = spark.write.format("csv")\
    .options(delimiter=',') \
    .options(inferSchema='True')\
    .options(header='True')\
    .mode("overwrite")
    .save("/filePath/filename")
    

    By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.

    path
    dbfs:/FileStore/name1.csv/_SUCCESS
    dbfs:/FileStore/name1.csv/_committed_7114979947112568022
    dbfs:/FileStore/name1.csv/_started_7114979947112568022
    dbfs:/FileStore/name1.csv/part-00000-tid-7114979947112568022-b2482528-b2f8-4c82-82fb-7c763d91300e-12-1-c000.csv

    PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.

    df.coalesce(1).write.format("csv").mode("overwrite").options(header=True).save("dbfs:/FileStore/name1.csv")

    at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv

    dbfs:/FileStore/name1.csv/part-00000-tid-4656450869948503591-85145263-6f01-4b56-ad37-3455ca9a8882-9-1-c000.csv

    we need take additional step – “Rename the File to name1.csv

    # List all files in the directory
    files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
    
    # Filter for the part file
    for file in files:
        if file.name.startswith("part-"):
            source_file = file.path  # Full path to the part file
            destination_file = "dbfs:/FileStore/name1.csv"
            
            # Move and rename the file
            dbutils.fs.mv(source_file, destination_file)
            break
    
    
    display(dbutils.fs.ls ( "dbfs:/FileStore/"))
    
    

    Direct Write with Custom File Name

    PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:

    Steps:

    1. Use coalesce(1) to combine all data into a single partition.
    2. Save the file to a temporary location.
    3. Rename the part file to the desired name.
    # Combine all data into one partition
    df.coalesce(1).write.format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save("dbfs:/FileStore/temp_folder")
    
    # Get the name of the part file
    files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
    for file in files:
        if file.name.startswith("part-"):
            part_file = file.path
            break
    
    # Move and rename the part file
    dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
    
    # Remove the temporary folder
    dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
    Read & Write Parquet File

    sample data save at /tmp/output/people.parquet

    dbfs:/tmp/output/people.parquet/part-00007-tid-4023475225384587157-553904a7-7460-4fb2-a4b8-140ccc85024c-15-1-c000.snappy.parquet

    read from parquet

    sample data save at /tmp/output/people.parquet
    df1=spark.read.parquet("/tmp/output/people.parquet")
    +---------+----------+--------+-----+------+------+
    |firstname|middlename|lastname|  dob|gender|salary|
    +---------+----------+--------+-----+------+------+
    |   James |          |   Smith|36636|     M|  3000|
    | Michael |      Rose|        |40288|     M|  4000|
    |  Robert |          |Williams|42114|     M|  4000|
    |   Maria |      Anne|   Jones|39192|     F|  4000|
    |      Jen|      Mary|   Brown|     |     F|    -1|
    +---------+----------+--------+-----+------+------+

    read a parquet is the same as reading from csv, nothing special.

    write to a parquet

    • append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.

      df.write.mode(“append”).parquet(“path/to/parquet/file”)
    • overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.

      df.write.mode(“overwrite”).parquet(“path/to/parquet/file”)
    • ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.

      df.write.mode(“ignore”).parquet(“path/to/parquet/file”)
    • Create Parquet partition file

      df.write.partitionBy(“gender”,”salary”).mode(“overwrite”).parquet(“/tmp/output/people2.parquet”)

    Read & write Delta Table in PySpark

    Read from a Delta Table

    Read from a Registered Delta Table (in the Catalog)
    df = spark.read.table("default.dim_dep")
    
    df.show()
    +--------+-----+-----+----------+----------+----------+
    |Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
    +--------+-----+-----+----------+----------+----------+
    |       1| 1001|   IT|2019-01-01|9999-12-31|      true|
    |       2| 1002|Sales|2019-01-01|9999-12-31|      true|
    |       3| 1003|   HR|2019-01-01|9999-12-31|      true|
    +--------+-----+-----+----------+----------+----------+
    Read from a Delta Table Stored in a Directory (Path-Based)
    df_delta_table = spark.read.format("delta").load("dbfs:/mnt/dim/")
    
    df_delta_table.show()
    +--------+-----+-----+----------+----------+----------+
    |Surrokey|depID|  dep| StartDate|   EndDate|IsActivity|
    +--------+-----+-----+----------+----------+----------+
    |       1| 1001|   IT|2019-01-01|9999-12-31|      true|
    |       2| 1002|Sales|2019-01-01|9999-12-31|      true|
    |       3| 1003|   HR|2019-01-01|9999-12-31|      true|
    +--------+-----+-----+----------+----------+----------+
    

    Write to a Delta Table

    Append to a Delta Table
    # Appends to the Delta table
    df.write.format("delta").mode("append").save("dbfs:/mnt/dim/")  
    Overwrite a Delta Table
    # Overwrites the Delta table
    df.write.format("delta").mode("overwrite").save("dbfs:/mnt/dim/")  
    Create or Overwrite a Registered Delta Table
    # Overwrites the table in the catalog
    df.write.format("delta").mode("overwrite").saveAsTable("default.dim_dep") 
    Append to a Registered Delta Table:
    # Appends to the table in the catalog
    df.write.format("delta").mode("append").saveAsTable("default.dim_dep")  

    merge operation (upsert)

    from delta.tables import DeltaTable
    from pyspark.sql.functions import current_date, lit
    
    # Perform the merge operation
    target_table.alias("t").merge(
        source_df.alias("s"),
        "t.id = s.id"  # Join condition: match rows based on `id`
    ).whenMatchedUpdate(
        set={
            "name": "s.name",  # Update `name` column
            "date": "s.date"   # Update `date` column
        }
    ).whenNotMatchedInsert(
        values={
            "id": "s.id",      # Insert `id`
            "name": "s.name",  # Insert `name`
            "date": "s.date"   # Insert `date`
        }
    ).execute()
    
    # Verify the result
    result_df = spark.read.format("delta").load(target_table_path)
    result_df.show()
    Explanation of the Code
    1. Target Table (target_table):
      • The Delta table is loaded using DeltaTable.forPath.
      • This table contains existing data where updates or inserts will be applied.
    2. Source DataFrame (source_df):
      • This DataFrame contains new or updated records.
    3. Join Condition ("t.id = s.id"):
      • Rows in the target table (t) are matched with rows in the source DataFrame (s) based on id.
    4. whenMatchedUpdate:
      • If a matching row is found, update the name and date columns in the target table.
    5. whenNotMatchedInsert:
      • If no matching row is found, insert the new record from the source DataFrame into the target table.
    6. execute():
      • Executes the merge operation, applying updates and inserts.
    7. Result Verification:
      • After the merge, the updated Delta table is read and displayed.

    Schema Evolution

    df.write.format("delta").option("mergeSchema", "true").mode("append").save("dbfs:/mnt/dim/")
    Read & Write JSON file

    Read a simple JSON file

    # Read a simple JSON file into dataframe
    {
      "RecordNumber": 10,
      "Zipcode": 709,
      "ZipCodeType": "STANDARD",
      "City": "BDA SAN LUIS",
      "State": "PR"
    }
    
    df_simple = spark.read.format("json") \
    .option("multiline","True")\
    .load("dbfs:/FileStore/simple_json.json")
    
    df_simple.printSchema()
    root
     |-- City: string (nullable = true)
     |-- RecordNumber: long (nullable = true)
     |-- State: string (nullable = true)
     |-- ZipCodeType: string (nullable = true)
     |-- Zipcode: long (nullable = true)
    
    df_simple.show()
    +------------+------------+-----+-----------+-------+
    |        City|RecordNumber|State|ZipCodeType|Zipcode|
    +------------+------------+-----+-----------+-------+
    |BDA SAN LUIS|          10|   PR|   STANDARD|    709|
    +------------+------------+-----+-----------+-------+

    pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error

    df_simple = spark.read.format(“json”).load(“dbfs:/FileStore/simple_json.json”)
    df_simple,show()

    AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).

    Reading from Multiline JSON (JSON Array) File

    [{
      "RecordNumber": 2,
      "Zipcode": 704,
      "ZipCodeType": "STANDARD",
      "City": "PASEO COSTA DEL SUR",
      "State": "PR"
    },
    {
      "RecordNumber": 10,
      "Zipcode": 709,
      "ZipCodeType": "STANDARD",
      "City": "BDA SAN LUIS",
      "State": "PR"
    }]
    df_jsonarray_simple = spark.read\
        .format("json")\
        .option("multiline", "true")\
        .load("dbfs:/FileStore/jsonArrary.json")
    
    df_jsonarray_simple.show()
    +-------------------+------------+-----+-----------+-------+
    |               City|RecordNumber|State|ZipCodeType|Zipcode|
    +-------------------+------------+-----+-----------+-------+
    |PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
    |       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
    +-------------------+------------+-----+-----------+-------+

    read complex json

    df_complexjson=spark.read\
        .option("multiline","true")\
        .json("dbfs:/FileStore/jsonArrary2.json")
    
    df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
    -RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
     id      | 0001                                                                                                                                      
     type    | donut                                                                                                                                     
     name    | Cake                                                                                                                                      
     ppu     | 0.55                                                                                                                                      
     batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}                                                           
     topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}] 
    -RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
     id      | 0002                                                                                                                                      
     type    | donut                                                                                                                                     
     name    | Raised                                                                                                                                    
     ppu     | 0.55                                                                                                                                      
     batters | {[{1001, Regular}]}                                                                                                                       
     topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]                                                           
    -RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
     id      | 0003                                                                                                                                      
     type    | donut                                                                                                                                     
     name    | Old Fashioned                                                                                                                             
     ppu     | 0.55                                                                                                                                      
     batters | {[{1001, Regular}, {1002, Chocolate}]}                                                                                                    
     topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]                                                                          
    
    

    Reading from Multiple files at a time

    # Read multiple files
    df2 = spark.read.json(
        ['resources/zipcode1.json','resources/zipcode2.json'])

    Reading from Multiple files at a time

    # Read all JSON files from a folder
    df3 = spark.read.json("resources/*.json")

    Reading files with a user-specified custom schema

    # Define custom schema
    schema = StructType([
          StructField("RecordNumber",IntegerType(),True),
          StructField("Zipcode",IntegerType(),True),
          StructField("ZipCodeType",StringType(),True),
          StructField("City",StringType(),True),
          StructField("State",StringType(),True),
          StructField("LocationType",StringType(),True),
          StructField("Lat",DoubleType(),True),
          StructField("Long",DoubleType(),True),
          StructField("Xaxis",IntegerType(),True),
          StructField("Yaxis",DoubleType(),True),
          StructField("Zaxis",DoubleType(),True),
          StructField("WorldRegion",StringType(),True),
          StructField("Country",StringType(),True),
          StructField("LocationText",StringType(),True),
          StructField("Location",StringType(),True),
          StructField("Decommisioned",BooleanType(),True),
          StructField("TaxReturnsFiled",StringType(),True),
          StructField("EstimatedPopulation",IntegerType(),True),
          StructField("TotalWages",IntegerType(),True),
          StructField("Notes",StringType(),True)
      ])
    
    df_with_schema = spark.read.schema(schema) \
            .json("resources/zipcodes.json")
    df_with_schema.printSchema()

    Reading File using PySpark SQL

    spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
          " (path 'resources/zipcodes.json')")
    spark.sql("select * from zipcode").show()

    Write to JSON

    Options

    • path: Specifies the path where the JSON files will be saved.
    • mode: Specifies the behavior when writing to an existing directory.
    • compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”).
      df2.write . option(“compression”, “gzip”)
    • dateFormat: Specifies the format for date and timestamp columns.
      df.write . option(“dateFormat”, “yyyy-MM-dd”)
    • timestampFormat: Specifies the format for timestamp columns.
      df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
    • lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline)
      df . write . option(“lineSep”, “\r\n”)
    • encoding: Specifies the character encoding to use when writing the JSON files.
      UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings.
      df . write . option(“encoding”, “UTF-8”)
    df2.write . format("json")
     . option("compression", "gzip") \
     . option("dateFormat", "yyyy-MM-dd") \ 
     . option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") \
     . option("encoding", "UTF-8") \
     . option("lineSep", "\r\n")
     . save("dbfs:/FileStore/output_json")

    modes

    1. Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
    2. Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
    3. Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
    4. Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
    # Write with savemode example
    df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
    Read & Write SQL Server

    Read from SQL Server

    server_name = "mainri-sqldb.database.windows.net"
    port=1433
    username="my login name"
    password="my login password"
    database_name="mainri-sqldb"
    table_name="dep"
    
    jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
    connection_properties = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    df.show()
    +-----+--------------+
    |depid|           dep|
    +-----+--------------+
    |    1|            IT|
    |    2|          Sale|
    |    3|       Finance|
    |    4|human resource|
    +-----+--------------+
    

    alternative way

    df1=spark.read \
      .format("jdbc") \
      .option("url", jdbc_url) \
      .option("dbtable", table_name) \
      .option("user", username) \
      .option("password", password) \
      .load()

    Select Specific Columns to Read

    In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.

    df1=spark.read \
      .format("jdbc") \
      .option("url", jdbc_url) \
      .option("databaseName",database_name) \
      .option("query", "select * from [dbo].[dep] where depid>3") \
      .option("user", username) \
      .option("password", password) \
      .load()
    
    df.show()
    +-----+--------------+
    |depid|           dep|
    +-----+--------------+
    |    4|human resource|
    |    5|         admin|
    +-----+--------------+

    write to table

    write mode:

    Append:
    mode("append") to append the rows to the existing SQL Server table.

    # we have define variables, here is show again
    server_name = “mainri-sqldb.database.windows.net”
    port=1433
    username=”my login name”
    password=”my login password”
    database_name=”mainri-sqldb”
    table_name=”dep”
    jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”

    # append the rows to the existing SQL Server table.
    
    df_newrow.show()
    +-----+-----+
    |depid|  dep|
    +-----+-----+
    |    5|admin|
    +-----+-----+
    
    df_newrow.write \
      .format("jdbc") \
      .mode("append") \
      .option("url", jdbc_url) \
      .option("dbtable", table_name) \
      .option("user", username) \
      .option("password", password) \
      .save()
    +-----+--------------+
    |depid|           dep|
    +-----+--------------+
    |    1|            IT|
    |    2|          Sale|
    |    3|       Finance|
    |    4|human resource|
    |    5|         admin| <- new added row
    +-----+--------------+
    

    overwrite

    The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.

    df_newrow.write \
      .format("jdbc") \
      .mode("overwrite") \
      .option("url", jdbc_url) \
      .option("dbtable", table_name) \
      .option("user", username) \
      .option("password", password) \
      .save()
    Read & Write MySQL

    Read from MySQL

    # Read from MySQL Table
    val df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("dbtable", "employee") \
        .option("user", "root") \
        .option("password", "root") \
        .load()

    Select Specific Columns to Read

    # Read from MySQL Table
    val df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("query", "select id,age from employee where gender='M'") \
        .option("numPartitions",4) \
        .option("fetchsize", 20) \
        .option("user", "root") \
        .option("password", "root") \
        .load()

    Write to MySQL

    Some points to note while writing

    • To re-write the existing table, use the mode("overwrite"). This drops the table if already exists by default and re-creates a new one without indexes.
    • To retain the indexes, use option("truncate","true").
    • By default, the connector uses READ_COMMITTED isolation level. To change this use option("mssqlIsolationLevel", "READ_UNCOMMITTED").
    • The dbtable option is used in PySpark to specify the name of the table in a database that you want to read data from or write data to.
    # Write to MySQL Table
    sampleDF.write \
      .format("jdbc") \
      .option("driver","com.mysql.cj.jdbc.Driver") \
      .option("url", "jdbc:mysql://localhost:3306/emp") \
      .option("dbtable", "employee") \
      .option("user", "root") \
      .option("password", "root") \
      .save()
    Read JDBC in Parallel

    PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing. 

    The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

    # Read Table in Parallel
    df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("dbtable","employee") \
        .option("numPartitions",5) \
        .option("user", "root") \
        .option("password", "root") \
        .load()
    

    Select columns with where clause

    # Select columns with where clause
    df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("query","select id,age from employee where gender='M'") \
        .option("numPartitions",5) \
        .option("user", "root") \
        .option("password", "root") \
        .load()

    Using fetchsize with numPartitions to Read

    The fetchsize is another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers.  Do not set this to very large number as you might see issues.

    # Using fetchsize
    df = spark.read \
        .format("jdbc") \
        .option("driver","com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/emp") \
        .option("query","select id,age from employee where gender='M'") \
        .option("numPartitions",5) \
        .option("fetchsize", 20) \
        .option("user", "root") \
        .option("password", "root") \
        .load()
    Read & Write delta table, Catalog, Hive Table

    Read delta table, Catalog, Hive Table

    # Read Hive table
    df = spark.sql("select * from emp.employee")
    df.show()
    # Read Hive table
    df = spark.read.table("employee")
    df.show()
    # Read delta table
    df = spark.sql("select * from delta.` table path `  ")
    df.show()
    
    caution: ` Backticks 

    Write / Save

    To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view.

    # Create Hive Internal table
    sampleDF.write.mode('overwrite') \
        .saveAsTable("emp.employee")

    use SQL, temporary view

    # Create temporary view
    sampleDF.createOrReplaceTempView("sampleView")
    
    # Create a Database CT
    spark.sql("CREATE DATABASE IF NOT EXISTS ct")
    
    # Create a Table naming as sampleTable under CT database.
    spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")
    
    # Insert into sampleTable using the sampleView. 
    spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")
    
    # Lets view the data in the table
    spark.sql("SELECT * FROM ct.sampleTable").show()

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Overview of Commonly Used Unity Catalog and Spark SQL Management Commands

    Summary of frequently use Unity Catalog and Spark SQL management commands, organized in a table.

    CategoryCommandDescriptionExample
    Catalog ManagementSHOW CATALOGSLists all available catalogs.SHOW CATALOGS;
    Schema ManagementSHOW SCHEMAS IN <catalog_name>Lists schemas (databases) within a catalog.SHOW SCHEMAS IN main;
    DESCRIBE SCHEMA <catalog_name>.<schema_name>Provides metadata about a specific schema.DESCRIBE SCHEMA main.default;
    Table ManagementSHOW TABLES IN <catalog_name>.<schema_name>Lists all tables in a schema.SHOW TABLES IN main.default;
    DESCRIBE TABLE <catalog_name>.<schema_name>.<table_name>Displays metadata about a specific table.DESCRIBE TABLE main.default.sales_data;
    SHOW PARTITIONS <catalog_name>.<schema_name>.<table_name>Lists partitions of a partitioned table.SHOW PARTITIONS main.default.sales_data;
    SHOW COLUMNS IN <catalog_name>.<schema_name>.<table_name>Lists all columns of a table, including their data types.SHOW COLUMNS IN main.default.sales_data;
    DROP TABLE <catalog_name>.<schema_name>.<table_name>Deletes a table from the catalog.DROP TABLE main.default.sales_data;
    Database ManagementSHOW DATABASESLists all databases (schemas) in the environment.SHOW DATABASES;
    DESCRIBE DATABASE <database_name>Provides metadata about a specific database.DESCRIBE DATABASE default;
    Data QueryingSELECT * FROM <catalog_name>.<schema_name>.<table_name>Queries data from a table.SELECT * FROM main.default.sales_data WHERE region = 'West';
    Table CreationCREATE TABLE <catalog_name>.<schema_name>.<table_name> (<columns>)Creates a managed table in Unity Catalog.CREATE TABLE main.default.sales_data (id INT, region STRING, amount DOUBLE);

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Read a delta table from Blob/ADLS and write a delta table to Blob/ADLS

    When your Delta tables reside in Blob Storage or Azure Data Lake Storage (ADLS), you interact with them directly using their file paths. This differs from how you might access tables managed within a metastore like Unity Catalog, where you’d use a cataloged name.

    Reading Delta Tables from Blob Storage or ADLS

    To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.

    Syntax

    # Spark SQL
    SELECT * FROM delta.`/mnt/path/to/delta/table`caution: " ` " - backticks# pyspark
    df = spark.read.format("delta").load("path/to/delta/table")
      

    Writing Delta Tables to Blob Storage or ADLS

    When writing to Delta tables, use the delta format and specify the path where you want to store the table.

    Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:

    # SparkSQL
    INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
    caution: " ` " - backticks
    
    # PySpark 
    df.write.format("delta").mode("overwrite").save("path/to/delta/table")

    Options and Parameters for Delta Read/Write

    Options for Reading Delta Tables:

    You can configure the read operation with options like:

    • mergeSchema: Allows schema evolution if the structure of the Delta table changes.
    • spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
    • timeTravel: Enables querying older versions of the Delta table.
    df = spark.read.format("delta").option("mergeSchema", "true").load("path/to/delta/table")
    df.show()

    Options for Writing Delta Tables:

    mode: Controls the write mode.

    • overwrite: Overwrites the existing data.
    • append: Adds to existing data.
    • ignore: Ignores the write if data exists.
    • errorifexists: Defaults to throwing an error if data exists.

    partitionBy: Partition the data by one or more columns.

    overwriteSchema: Overwrites the schema of an existing Delta table if there’s a schema change.

    df.write.format("delta").mode("overwrite") \
        .option("overwriteSchema", "true") \
        .partitionBy("column_name") \
        .save("path/to/delta/table")
    

    Time Travel and Versioning with Delta (PySpark)

    Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.

    # Read from a specific version
    df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
    df.show()
    
    # Read data at a specific timestamp
    df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
    df.show()
    

    Conclusion:

    • Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
    • Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
    • For reading, use .format("delta").load("path").
    • For writing, use .write.format("delta").save("path").

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    DBFS: Access database read/write database using JDBC

    Read/Write Data from Sql Database using JDBC

    jdbc connect to database

    Define the JDBC URL and connection properties

    
    jdbc_url = "jdbc:sqlserver://<server>:<port>;databaseName=<database>"
    
    connection_properties = {
        "user": "<username>",
        "password": "<password>",
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    

    Read data from the SQL database

    df = spark.read.jdbc(url=jdbc_url, table="", properties=connection_properties)

    Write data to the SQL database

    df.write.jdbc(url=jdbc_url, table="", mode="overwrite", properties=connection_properties)

    example

    # Parameters
    server_name = "myserver.database.windows.net"
    port = "1433"
    database_name = "mydatabase"
    table_name = "mytable"
    username = "myusername"
    password = "mypassword"
    
    # Construct the JDBC URL
    jdbc_url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
    connection_properties = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    # Read data from the SQL database
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    
    # Perform any transformations on df if needed
    
    # Write data to the SQL database
    df.write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=connection_properties)

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)

    Appendix:

    KQL query map SQL query

    To those whom are familiar SQL query syntax, but new to KQL. The following table shows sample queries in SQL and their KQL equivalents.

    CategorySQL QueryKusto QueryLearn more
    Select data from tableSELECT * FROM dependenciesdependenciesTabular expression statements
    SELECT name, resultCode FROM dependenciesdependencies | project name, resultCodeproject
    SELECT TOP 100 * FROM dependenciesdependencies | take 100take
    Null evaluationSELECT * FROM dependencies
    WHERE resultCode IS NOT NULL
    dependencies
    | where isnotnull(resultCode)
    isnotnull()
    Comparison operators (date)SELECT * FROM dependencies
    WHERE timestamp > getdate()-1
    dependencies
    | where timestamp > ago(1d)
    ago()
    SELECT * FROM dependencies
    WHERE timestamp BETWEEN ... AND ...
    dependencies
    | where timestamp between (datetime(2016-10-01) .. datetime(2016-11-01))
    between
    Comparison operators (string)SELECT * FROM dependencies
    WHERE type = "Azure blob"
    dependencies
    | where type == "Azure blob"
    Logical operators
    -- substring
    SELECT * FROM dependencies
    WHERE type like "%blob%"
    // substring
    dependencies
    | where type has "blob"
    has
    -- wildcard
    SELECT * FROM dependencies
    WHERE type like "Azure%"
    // wildcard
    dependencies
    | where type startswith "Azure"
    // or
    dependencies
    | where type matches regex "^Azure.*"
    startswith
    matches regex
    Comparison (boolean)SELECT * FROM dependencies
    WHERE !(success)
    dependencies
    | where success == False
    Logical operators
    Grouping, AggregationSELECT name, AVG(duration) FROM dependencies
    GROUP BY name
    dependencies
    | summarize avg(duration) by name
    summarize
    avg()
    DistinctSELECT DISTINCT name, type FROM dependenciesdependencies
    | summarize by name, type
    summarize
    distinct
    SELECT name, COUNT(DISTINCT type)
    FROM dependencies
    GROUP BY name
    dependencies
    | summarize by name, type | summarize count() by name
    // or approximate for large sets
    dependencies
    | summarize dcount(type) by name
    count()
    dcount()
    Column aliases, ExtendingSELECT operationName as Name, AVG(duration) as AvgD FROM dependencies
    GROUP BY name
    dependencies
    | summarize AvgD = avg(duration) by Name=operationName
    Alias statement
    SELECT conference, CONCAT(sessionid, ' ' , session_title) AS session FROM ConferenceSessionsConferenceSessions
    | extend session=strcat(sessionid, " ", session_title)
    | project conference, session
    strcat()
    project
    OrderingSELECT name, timestamp FROM dependencies
    ORDER BY timestamp ASC
    dependencies
    | project name, timestamp
    | sort by timestamp asc nulls last
    sort
    Top n by measureSELECT TOP 100 name, COUNT(*) as Count FROM dependencies
    GROUP BY name
    ORDER BY Count DESC
    dependencies
    | summarize Count = count() by name
    | top 100 by Count desc
    top
    UnionSELECT * FROM dependencies
    UNION
    SELECT * FROM exceptions
    union dependencies, exceptionsunion
    SELECT * FROM dependencies
    WHERE timestamp > ...
    UNION
    SELECT * FROM exceptions
    WHERE timestamp > ...
    dependencies
    | where timestamp > ago(1d)
    | union
    (exceptions
    | where timestamp > ago(1d))
    JoinSELECT * FROM dependencies
    LEFT OUTER JOIN exceptions
    ON dependencies.operation_Id = exceptions.operation_Id
    dependencies
    | join kind = leftouter
    (exceptions)
    on $left.operation_Id == $right.operation_Id
    join
    Nested queries
    Sub-query
    SELECT * FROM dependencies
    WHERE resultCode ==
    (SELECT TOP 1 resultCode FROM dependencies
    WHERE resultId = 7
    ORDER BY timestamp DESC)
    dependencies
    | where resultCode == toscalar(
    dependencies
    | where resultId == 7
    | top 1 by timestamp desc
    | project resultCode)
    toscalar
    HavingSELECT COUNT(\*) FROM dependencies
    GROUP BY name
    HAVING COUNT(\*) > 3
    dependencies
    | summarize Count = count() by name
    | where Count > 3
    summarize
    where

    Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

    (remove all space from the email account 😊)