Logging in Azure Databricks with Python

In Azure Databricks, logging is crucial for monitoring, debugging, and auditing your notebooks, jobs, and applications. Since Databricks runs on a distributed architecture and utilizes standard Python, you can use familiar Python logging tools, along with features specific to the Databricks environment like Spark logging and MLflow tracking.

Python’s logging module provides a versatile logging system for messages of different severity levels and controls their presentation. To get started with the logging module, you need to import it to your program first, as shown below:

import logging

logging.debug("A debug message")
logging.info("An info message")
logging.warning("A warning message")
logging.error("An error message")
logging.critical("A critical message")

log levels in Python

Log levels define the severity of the event that is being logged. For example a message logged at the INFO level indicates a normal and expected event, while one that is logged at the ERROR level signifies that some unexpected error has occurred.

Each log level in Python is associated with a number (from 10 to 50) and has a corresponding module-level method in the logging module as demonstrated in the previous example. The available log levels in the logging module are listed below in increasing order of severity:

Logging Level Quick Reference

LevelMeaningWhen to Use
DEBUG (10)detailed debugging infodevelopment
INFO (20)normal messagesjob progress
WARNING (30)minor issuenon-critical issues
ERROR (40)failurerecoverable errors
CRITICAL (50)system failurestop the job

It’s important to always use the most appropriate log level so that you can quickly find the information you need. For instance, logging a message at the WARNING level will help you find potential problems that need to be investigated, while logging at the ERROR or CRITICAL level helps you discover problems that need to be rectified immediately.

By default, the logging module will only produce records for events that have been logged at a severity level of WARNING and above.

Logging basic configuration

Ensure to place the call to logging.basicConfig() before any methods such as info()warning(), and others are used. It should also be called once as it is a one-off configuration facility. If called multiple times, only the first one will have an effect.

logging.basicConfig() example

import logging
from datetime import datetime

## set date time format
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")

## Output WITH writing to a file (Databricks log file)
log_file = f"/dbfs/tmp/my_pipeline/logs/run_{run_id}.log"

## start configuing
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s — %(levelname)s — %(message)s",
)

## creates (or retrieves) a named logger that you will use to write log messages.
logger = logging.getLogger("pipeline")
## The string "pipeline" is just a name for the logger. It can be  anything:
## "etl"
## "my_app"
## "sales_job"
## "abc123"

simple logging example

import logging
from datetime import datetime

run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = f"/dbfs/tmp/my_pipeline/logs/run_{run_id}.log"

logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s — %(levelname)s — %(message)s",
)
logger = logging.getLogger("pipeline")

logger.info("=== Pipeline Started ===")

try:
    logger.info("Step 1: Read data")
    df = spark.read.csv(...)
    
    logger.info("Step 2: Transform")
    df2 = df.filter(...)
    
    logger.info("Step 3: Write output")
    df2.write.format("delta").save(...)

    logger.info("=== Pipeline Completed Successfully ===")

except Exception as e:
    logger.error(f"Pipeline Failed: {e}")
    raise

the log output looks this:

2025-11-21 15:05:01,112 – INFO – === Pipeline Started ===
2025-11-21 15:05:01,213 – INFO – Step 1: Read data
2025-11-21 15:05:01,315 – INFO – Step 2: Transform
2025-11-21 15:05:01,417 – INFO – Step 3: Write output
2025-11-21 15:05:01,519 – INFO – === Pipeline Completed Successfully ===

Log Rotation (Daily Files)

Log Rotation means: Your log file does NOT grow forever. Instead, it automatically creates a new log file each day (or each hour, week, etc.), and keeps only a certain number of old files.

This prevents:

  • huge log files
  • storage overflow
  • long-term disk growth
  • difficult debugging
  • slow I/O

It is very common in production systems (Databricks, Linux, App Servers, Databases). Without log rotation → 1 file becomes huge, With daily rotation:

my_log.log (today)
my_log.log.2025-11-24 (yesterday)
my_log.log.2025-11-23
my_log.log.2025-11-22

Python code that does Log Rotation

import logging
from logging.handlers import TimedRotatingFileHandler

handler = TimedRotatingFileHandler(
    "/dbfs/Volumes/logs/my_log.log",
    when="midnight",   # rotate every day
    interval=1,        # 1 day
    backupCount=30     # keep last 30 days
)

formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger = logging.getLogger("rotating_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)

logger.info("Log rotation enabled")
Hourly Log Rotation
handler = TimedRotatingFileHandler(
    "/dbfs/tmp/mylogs/hourly.log",
    when="H",       # rotate every hour
    interval=1,
    backupCount=24  # keep 24 hours
)

Size-Based Log Rotation
handler = RotatingFileHandler(
    "/dbfs/tmp/mylogs/size.log",
    maxBytes=5 * 1024 * 1024,  # 5 MB
    backupCount=5              # keep 5 old files
)

Logging to Unity Catalog Volume (BEST PRACTICE)

Create a volume first (once):

CREATE VOLUME IF NOT EXISTS my_catalog.my_schema.logs;

use it in code:

from logger_setup import get_logger

logger = get_logger(
    app_name="customer_etl",
    log_path="/dbfs/Volumes/my_catalog/my_schema/logs/customer_etl"
)

logger.info("Starting ETL pipeline")

Databricks ETL Logging Template (Production-Ready)

Features

  • Writes logs to file
  • Uses daily rotation (keeps 30 days)
  • Logs INFO, ERROR, stack traces
  • Works in notebooks + Jobs
  • Fully reusable

1. Create the logger (ready for copy & paste)

“File Version”
 import logging
from logging.handlers import TimedRotatingFileHandler

def get_logger(name="etl"):
    log_path = "/dbfs/tmp/logs/pipeline.log"   # or a UC Volume

    handler = TimedRotatingFileHandler(
        log_path,
        when="midnight",
        interval=1,
        backupCount=30
    )

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # Prevent duplicate handlers in notebook re-runs
    if not logger.handlers:
        logger.addHandler(handler)

    return logger

logger = get_logger("my_pipeline")
logger.info("Logger initialized")
Unity Catalog Volume version
# If the folder doesn't exist:
dbutils.fs.mkdirs("/Volumes/my_catalog/my_schema/logs")

/Volumes/my_catalog/my_schema/logs/

# Create logger pointing to the UC Volume
import logging
from logging.handlers import TimedRotatingFileHandler

def get_logger(name="etl"):
    log_path = "/Volumes/my_catalog/my_schema/logs/pipeline.log"

    handler = TimedRotatingFileHandler(
        filename=log_path,
        when="midnight",
        interval=1,
        backupCount=30           # keep last 30 days
    )

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)

    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # Prevent duplicate handlers when re-running notebook cells
    if not logger.handlers:
        logger.addHandler(handler)

    return logger

logger = get_logger("my_pipeline")
logger.info("Logger initialized")

2. Use the logger inside your ETL

logger.info("=== ETL START ===")

try:
    logger.info("Step 1: Read data")
    df = spark.read.csv("/mnt/raw/data.csv")

    logger.info("Step 2: Transform")
    df2 = df.filter("value > 0")

    logger.info("Step 3: Write output")
    df2.write.format("delta").mode("overwrite").save("/mnt/curated/data")

    logger.info("=== ETL COMPLETED ===")

except Exception as e:
    logger.error(f"ETL FAILED: {e}", exc_info=True)
    raise
Resulting log file (example)

The output file looks like this:

2025-11-21 15:12:01,233 - INFO - my_pipeline - === ETL START ===
2025-11-21 15:12:01,415 - INFO - my_pipeline - Step 1: Read data
2025-11-21 15:12:01,512 - INFO - my_pipeline - Step 2: Transform
2025-11-21 15:12:01,660 - INFO - my_pipeline - Step 3: Write output
2025-11-21 15:12:01,780 - INFO - my_pipeline - === ETL COMPLETED ===

If error:

2025-11-21 15:15:44,812 - ERROR - my_pipeline - ETL FAILED: File not found
Traceback (most recent call last):
  ...

Best Practices for Logging in Python

By utilizing logs, developers can easily monitor, debug, and identify patterns that can inform product decisions, ensure that the logs generated are informative, actionable, and scalable.

  1. Avoid the root logger
    it is recommended to create a logger for each module or component in an application.
  2. Centralize your logging configuration
    Python module that will contain all the logging configuration code.
  3. Use correct log levels
  4. Write meaningful log messages
  5. % vs f-strings for string formatting in logs
  6. Logging using a structured format (JSON)
  7. Include timestamps and ensure consistent formatting
  8. Keep sensitive information out of logs
  9. Rotate your log files
  10. Centralize your logs in one place

Conclusion

To achieve the best logging practices, it is important to use appropriate log levels and message formats, and implement proper error handling and exception logging. Additionally, you should consider implementing log rotation and retention policies to ensure that your logs are properly managed and archived.

Technical Interview Questions and Answers

The Job Interview is the single most critical step in the job hunting process. It is the definitive arena where all of your abilities must integrate. The interview itself is not a skill you possess; it is the moment you deploy your Integrated Skill Set, blending:

  1. Hard Skills (Technical Mastery): Demonstrating not just knowledge of advanced topics, but the depth of your expertise and how you previously applied it to solve complex, real-world problems.
  2. Soft Skills (Communication & Presence): Clearly articulating strategy, managing complexity under pressure, and exhibiting the leadership presence expected of senior-level and expert-level candidates.
  3. Contextual Skills (Business Acumen): Framing your solutions within the company’s business goals and culture, showing that you understand the strategic impact of your work.

This Integrated Skill represents your first real opportunity to sell your strategic value to the employer.

Azure Data FactoryAzure DatabricksAzure Synapse AnalyticsSQL / KQL
Power BIMicrosoft FabricAzure PurviewAzure Sentinel
Related topics

Azure Data Factory

Data Factory
Can you talk on …

what is

Databricks

Azure Databricks / Delta / Unity Catalog / Lakehouse
What is Databricks Delta (Delta Lakehouse) and how does it enhance the capabilities of Azure Databricks?

Databricks Delta, now known as Delta Lake, is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It enhances Azure Databricks by providing features like:

  • ACID transactions for data reliability and consistency.
  • Scalable metadata handling for large tables.
  • Time travel for data versioning and historical data analysis.
  • Schema enforcement and evolution.
  • Improved performance with data skipping and Z-ordering
Are there any alternative solution that is similar to Delta lakehouse?

there are several alternative technologies that provide Delta Lake–style Lakehouse capabilities (ACID + schema enforcement + time travel + scalable storage + SQL engine). such as,

  • Apache Iceberg
  • Apache Hudi
  • Snowflake (Iceberg Tables / Unistore)
  • BigQuery + BigLake
  • AWS Redshift + Lake Formation + Apache Iceberg
  • Microsoft Fabric (OneLake + Delta/DQ/DLTS)
What is Delta Lake Table?

Delta lake tables are tables that store data in the delta format. Delta Lake is an extension to existing data lakes,

Explain how you can use Databricks to implement a Medallion Architecture (Bronze, Silver, Gold).
  1. Bronze Layer (Raw Data): Ingest raw data from various sources into the Bronze layer. This data is stored as-is, without any transformation.
  2. Silver Layer (Cleaned Data, as known Enriched layer): Clean and enrich the data from the Bronze layer. Apply transformations, data cleansing, and filtering to create more refined datasets.
  3. Gold Layer (Aggregated Data, as known Curated layer): Aggregate and further transform the data from the Silver layer to create high-level business tables or machine learning features. This layer is used for analytics and reporting.
What Is Z-Order (Databricks / Delta Lake)?

Z-Ordering in Databricks (specifically for Delta Lake tables) is an optimization technique designed to co-locate related information into the same set of data files on disk.

OPTIMIZE mytable
ZORDER BY (col1, col2);
What Is Liquid Clustering (Databricks)?

Liquid Clustering is Databricks’ next-generation data layout optimization for Delta Lake.
It replaces (and is far superior to) Z-Order.

ALTER TABLE sales
SET TBLPROPERTIES (
'delta.liquidClustered' = 'true',
'delta.liquidClustered.columns' = 'customer_id, event_date'
);
What is a Dataframe, RDD, Dataset in Azure Databricks?

Dataframe refers to a specified form of tables employed to store the data within Databricks during runtime. In this data structure, the data will be arranged into two-dimensional rows and columns to achieve better accessibility.

RDD, Resilient Distributed Dataset, is a fault-tolerant, immutable collection of elements partitioned across the nodes of a cluster. RDDs are the basic building blocks that power all of Spark’s computations.

Dataset is an extension of the DataFrame API that provides compile-time type safety and object-oriented programming benefits.

What is catching and its types?

A cache is a temporary storage that holds frequently accessed data, aiming to reduce latency and enhance speed. Caching involves the process of storing data in cache memory.

How would you secure and manage secrets in Azure Databricks when connecting to external data sources?
  1. Use Azure Key Vault to store and manage secrets securely.
  2. Integrate Azure Key Vault with Azure Databricks using Databricks-backed or Azure-backed scopes.
  3. Access secrets in notebooks and jobs using the dbutils.secrets API.
    dbutils.secrets.get(scope=”<scope-name>”, key=”<key-name>”)
  4. Ensure that secret access policies are strictly controlled and audited.
Scenario: You need to implement a data governance strategy in Azure Databricks. What steps would you take?

  • Data Classification: Classify data based on sensitivity and compliance requirements.
  • Access Controls: Implement role-based access control (RBAC) using Azure Active Directory.
  • Data Lineage: Use tools like Databricks Lineage to track data transformations and movement.
  • Audit Logs: Enable and monitor audit logs to track access and changes to data.
  • Compliance Policies: Implement Azure Policies and Azure Purview for data governance and compliance monitoring.
Scenario: You need to optimize a Spark job that has a large number of shuffle operations causing performance issues. What techniques would you use?
  1. Repartitioning: Repartition the data to balance the workload across nodes and reduce skew.
  2. Broadcast Joins: Use broadcast joins for small datasets to avoid shuffle operations.
  3. Caching: Cache intermediate results to reduce the need for recomputation.
  4. Shuffle Partitions: Increase the number of shuffle partitions to distribute the workload more evenly.
  5. Skew Handling: Identify and handle skewed data by adding salt keys or custom partitioning strategies.
Scenario: You need to migrate an on-premises Hadoop workload to Azure Databricks. Describe your migration strategy.
  • Assessment: Evaluate the existing Hadoop workloads and identify components to be migrated.
  • Data Transfer: Use Azure Data Factory or Azure Databricks to transfer data from on-premises HDFS to ADLS.
  • Code Migration: Convert Hadoop jobs (e.g., MapReduce, Hive) to Spark jobs and test them in Databricks.
  • Optimization: Optimize the Spark jobs for performance and cost-efficiency.
  • Validation: Validate the migrated workloads to ensure they produce the same results as on-premises.
  • Deployment: Deploy the migrated workloads to production and monitor their performance.

Azure Synapse Analytics

Synapse
what is …

what is

SQL / KQL

SQL
Can you talk about database locker?

Database locking is the mechanism a database uses to control concurrent access to data so that transactions stay consistent, isolated, and safe.

Locking prevents:

  • Dirty reads
  • Lost updates
  • Write conflicts
  • Race conditions

Types of Locks:

1. Shared Lock (S)

  • Used when reading data
  • Multiple readers allowed
  • No writers allowed

2. Exclusive Lock (X)

  • Used when updating or inserting
  • No one else can read or write the locked item

3. Update Lock (U) (SQL Server specific)

  • Prevents deadlocks when upgrading from Shared → Exclusive
  • Only one Update lock allowed

4. Intention Locks (IS, IX, SIX)

Used at table or page level to signal a lower-level lock is coming.

5. Row / Page / Table Locks

Based on granularity:

  • Row-level: Most common, best concurrency
  • Page-level: Several rows together
  • Table-level: When scanning or modifying large portions

DB engines automatically escalate:

Row → Page → Table
when there are too many small locks.

Can you talk on Deadlock?

A deadlock happens when:

  • Transaction A holds Lock 1 and wants Lock 2
  • Transaction B holds Lock 2 and wants Lock 1

Both wait on each other → neither can move → database detects → kills one transaction (“deadlock victim”).

Deadlocks usually involve one writer + one writer, but can also involve readers depending on isolation level.

How to Troubleshoot Deadlocks?
A: In SQL Server: Enable Deadlock Graph Capture
run:
ALTER DATABASE [YourDB] SET DEADLOCK_PRIORITY NORMAL;

use:
DBCC TRACEON (1222, -1);
DBCC TRACEON (1204, -1);
B: Interpret the Deadlock Graph

You will see:

  • Processes (T1, T2…)
  • Resources (keys, pages, objects)
  • Types of locks (X, S, U, IX, etc.)
  • Which statement caused the deadlock

Look for:

  • Two queries touching the same index/rows in different order
  • A scanning query locking too many rows
  • Missed indexes
  • Query patterns that cause U → X lock upgrades
C. Identify
  • The exact tables/images involved
  • The order of locking
  • The hotspot row or range
  • Rows with heavy update/contention

This will tell you what to fix.

How to Prevent Deadlocks (Practical + Senior-Level)
  • Always update rows in the same order
  • Keep transactions short
  • Use appropriate indexes
  • Use the correct isolation level
  • Avoid long reads before writes

Can you discuss on database normalization and denormalization

Normalization is the process of structuring a relational database to minimize data redundancy (duplicate data) and improve data integrity.

Normal FormRule SummaryProblem Solved
1NF (First)Eliminate repeating groups; ensure all column values are atomic (indivisible).Multi-valued columns, non-unique rows.
2NF (Second)Be in 1NF, AND all non-key attributes must depend on the entire primary key.Partial Dependency (non-key attribute depends on part of a composite key).
3NF (Third)Be in 2NF, AND eliminate transitive dependency (non-key attribute depends on another non-key attribute).Redundancy due to indirect dependencies.
BCNF (Boyce-Codd)A stricter version of 3NF; every determinant (column that determines another column) must be a candidate key.Edge cases involving multiple candidate keys.

Denormalization is the process of intentionally introducing redundancy into a previously normalized database to improve read performance and simplify complex queries.

  • Adding Redundant Columns: Copying a value from one table to another (e.g., copying the CustomerName into the Orders table to avoid joining to the Customer table every time an order is viewed).
  • Creating Aggregate/Summary Tables: Storing pre-calculated totals, averages, or counts to avoid running expensive aggregate functions at query time (e.g., a table that stores the daily sales total).
  • Merging Tables: Combining two tables that are frequently joined into a single, wider table.

Microsoft Fabric

Power BI

Azure Purview

Azure Sentinel

Markdown in a Databricks Notebook

Databricks Notebook Markdown is a special version of the Markdown language built directly into Databricks notebooks. It allows you to add richly formatted text, images, links, and even mathematical equations to your notebooks, turning them from just code scripts into interactive documents and reports.

Think of it as a way to provide context, explanation, and structure to your code cells, making your analysis reproducible and understandable by others (and your future self!).

Why is it Important?

Using Markdown cells effectively transforms your workflow:

  1. Documentation: Explain the purpose of the analysis, the meaning of a complex transformation, or the interpretation of a result.
  2. Structure: Create sections, headings, and tables of contents to organize long notebooks.
  3. Clarity: Add lists, tables, and links to data sources or external references.
  4. Communication: Share findings with non-technical stakeholders by narrating the story of your data directly alongside the code that generated it.

Key Features and Syntax with Examples

1. Headers (for Structure)

Use # to create different levels of headings.

%md
# Title (H1)
## Section 1 (H2)
### Subsection 1.1 (H3)
#### This is a H4 Header

Title (H1)

Section 1 (H2)

Subsection 1.1 (H3)

This is a H4 Header

2. Emphasis (Bold and Italic)

%md
*This text will be italic*
_This will also be italic_

**This text will be bold**
__This will also be bold__

**_You can combine them_**

This text will be italic This will also be italic

This text will be bold This will also be bold

You can combine them

3. Lists (Ordered and Unordered)

Unordered List:
%md
- Item 1
- Item 2
  - Sub-item 2.1
  - Sub-item 2.2
  • Item 1
  • Item 2
    • Sub-item 2.1
    • Sub-item 2.2
Ordered List:
%md
1. First item
2. Second item
   1. Sub-item 2.1
3. Third item
  1. First item
  2. Second item
    1. Sub-item 2.1
  3. Third item

4. Links and Images

link
%md
[Databricks Website](https://databricks.com)

Mainri Inc. webside

Image
%md
![mainri Inc. Logo](https://mainri.ca/wp-content/uploads/2024/08/Logo-15-trans.png)
mainri Inc. Logo

5. Tables

%md
| Column 1 Header | Column 2 Header | Column 3 Header |
|-----------------|-----------------|-----------------|
| Row 1, Col 1    | Row 1, Col 2    | Row 1, Col 3    |
| Row 2, Col 1    | Row 2, Col 2    | Row 2, Col 3    |
| *Italic Cell*   | **Bold Cell**   | Normal Cell     |
Column 1 HeaderColumn 2 HeaderColumn 3 Header
Row 1, Col 1Row 1, Col 2Row 1, Col 3
Row 2, Col 1Row 2, Col 2Row 2, Col 3
Italic CellBold CellNormal Cell

6. Code Syntax Highlighting (A Powerful Feature)

%md
```python
df = spark.read.table("samples.nyctaxi.trips")
display(df)
```

```sql
SELECT * FROM samples.nyctaxi.trips LIMIT 10;
```

```scala
val df = spark.table("samples.nyctaxi.trips")
display(df)
```

7. Mathematical Equations (LaTeX)

%md


$$
f(x) = \sum_{i=0}^{n} \frac{x^i}{i!}
$$

Summary

FeaturePurposeExample Syntax
HeadersCreate structure and sections## My Section
EmphasisAdd bold/italic emphasis**bold***italic*
ListsCreate bulleted or numbered lists- Item or 1. Item
TablesOrganize data in a grid| Header |
Links/ImagesAdd references and visuals[Text](URL)
Code BlocksDisplay syntax-highlighted codepython\ncode
Math (LaTeX)Render mathematical formulas$$E = mc^2$$

In essence, Databricks Notebook Markdown is the narrative glue that binds your code, data, and insights together, making your notebooks powerful tools for both analysis and communication.

Comparison of Microsoft Fabric, Azure Synapse Analytics (ASA), Azure Data Factory (ADF), and Azure Databricks (ADB)

Today, data engineers have a wide array of tools and platforms at their disposal for data engineering projects. Popular choices include Microsoft Fabric, Azure Synapse Analytics (ASA), Azure Data Factory (ADF), and Azure Databricks (ADB). It’s common to wonder which one is the best fit for your specific needs.

Side by Side comparison

Here’s a concise comparison of Microsoft FabricAzure Synapse AnalyticsAzure Data Factory (ADF), and Azure Databricks (ADB) based on their key features, use cases, and differences:

FeatureMicrosoft FabricAzure Synapse AnalyticsAzure Data Factory (ADF)Azure Databricks (ADB)
TypeUnified SaaS analytics platformIntegrated analytics serviceCloud ETL/ELT serviceApache Spark-based analytics platform
Primary Use CaseEnd-to-end analytics (Data Engineering, Warehousing, BI, Real-Time)Large-scale data warehousing & analyticsData integration & orchestrationBig Data processing, ML, AI, advanced analytics
Data IntegrationBuilt-in Data Factory capabilitiesSynapse Pipelines (similar to ADF)Hybrid ETL/ELT pipelinesLimited (relies on Delta Lake, ADF, or custom code)
Data WarehousingOneLake (Delta-Parquet based)Dedicated SQL pools (MPP)Not applicableCan integrate with Synapse/Delta Lake
Big Data ProcessingSpark-based (Fabric Spark)Spark pools (serverless/dedicated)No (orchestration only)Optimized Spark clusters (Delta Lake)
Real-Time AnalyticsYes (Real-Time Hub)Yes (Synapse Real-Time Analytics)NoYes (Structured Streaming)
Business IntelligencePower BI (deeply integrated)Power BI integrationNoLimited (via dashboards or Power BI)
Machine LearningBasic ML integrationML in Spark poolsNoFull ML/DL support (MLflow, AutoML)
Pricing ModelCapacity-based (Fabric SKUs)Pay-as-you-go (serverless) or dedicatedActivity-basedDBU-based (compute + storage)
Open Source SupportLimited (Delta-Parquet)Limited (Spark, SQL)NoFull (Spark, Python, R, ML frameworks)
GovernanceCentralized (OneLake, Purview)Workspace-levelLimitedWorkspace-level (Unity Catalog)

Key Differences

  • Fabric vs Synapse: Fabric is a fully managed SaaS (simpler, less configurable), while Synapse offers more control (dedicated SQL pools, Spark clusters).
  • ADF vs Synapse Pipelines: Synapse Pipelines = ADF inside Synapse (same engine).
  • ADB vs Fabric Spark: ADB has better ML & open-source support, while Fabric Spark is simpler & integrated with Power BI.

When to Use Which

  1. Microsoft Fabric
    • Best for end-to-end analytics in a unified SaaS platform (no infrastructure management).
    • Combines data engineering, warehousing, real-time, and BI in one place.
    • Good for Power BI-centric organizations.
  2. Azure Synapse Analytics
    • Best for large-scale data warehousing with SQL & Spark processing.
    • Hybrid of ETL (Synapse Pipelines), SQL Pools, and Spark analytics.
    • More flexible than Fabric (supports open formats like Parquet, CSV).
  3. Azure Data Factory (ADF)
    • Best for orchestrating ETL/ELT workflows (no compute/storage of its own).
    • Used for data movement, transformations, and scheduling.
    • Often paired with Synapse or Databricks.
  4. Azure Databricks (ADB)
    • Best for advanced analytics, AI/ML, and big data processing with Spark.
    • Optimized for Delta Lake (ACID transactions on data lakes).
    • Preferred for data science teams needing MLflow, AutoML, etc.

Which One Should You Choose?

  • For a fully integrated Microsoft-centric solution → Fabric
  • For large-scale data warehousing + analytics → Synapse
  • For ETL/data movement → ADF (or Synapse Pipelines)
  • For advanced Spark-based analytics & ML → Databricks

Data Quality Framework (DQX)

Data quality is more critical than ever in today’s data-driven world. Organizations are generating and collecting vast amounts of data, and the ability to trust and leverage this information is paramount for success. Poor data quality can have severe negative impacts, ranging from flawed decision-making to regulatory non-compliance and significant financial losses.

Key Dimensions of Data Quality (DAMA-DMBOK or ISO 8000 Standards)

A robust DQX evaluates data across multiple dimensions:

  • Accuracy: Data correctly represents real-world values.
  • Completeness: No missing or null values where expected.
  • Consistency: Data is uniform across systems and over time.
  • Timeliness: Data is up-to-date and available when needed.
  • Validity: Data conforms to defined business rules (e.g., format, range).
  • Uniqueness: No unintended duplicates.
  • Integrity: Relationships between datasets are maintained.

What is Data Quality Framework (DQX)

Data Quality Framework (DQX) is an open-source framework from Databricks Labs designed to simplify and automate data quality checks for PySpark workloads on both batch and streaming data.

DAX is a structured approach to assessing, monitoring, and improving the quality of data within an organization. It define, validate, and enforce data quality rules across your data pipelines. It ensures that data is accurate, consistent, complete, reliable, and fit for its intended use. so it can be used confidently for analytics, reporting, compliance, and decision-making.

This article will explore how the DQX framework helps improve data reliability, reduce data errors, and enforce compliance with data quality standards. We will step by step go through all steps, from setup and use DQX framework in databricks notebook with code snippets to implement data quality checks.

DQX usage in the Lakehouse Architecture

In the Lakehouse architecture, new data validation should happen during data entry into the curated layer to ensure bad data is not propagated to the subsequent layers. With DQX, you can implement Dead-Letter pattern to quarantine invalid data and re-ingest it after curation to ensure data quality constraints are met. The data quality can be monitored in real-time between layers, and the quarantine process can be automated.

Credits: https://databrickslabs.github.io/dqx/docs/motivation/

Components of a Data Quality Framework

A DQX typically includes:

A. Data Quality Assessment

  • Profiling: Analyze data to identify anomalies (e.g., outliers, nulls).
  • Metrics & KPIs: Define measurable standards (e.g., % completeness, error rates).
  • Benchmarking: Compare against industry standards or past performance.

B. Data Quality Rules & Standards

  • Define validation rules (e.g., “Email must follow RFC 5322 format”).
  • Implement checks at the point of entry (e.g., form validation) and during processing.

C. Governance & Roles

  • Assign data stewards responsible for quality.
  • Establish accountability (e.g., who fixes issues? Who approves changes?).

D. Monitoring & Improvement

  • Automated checks: Use tools like Great Expectations, Talend, or custom scripts.
  • Root Cause Analysis (RCA): Identify why errors occur (e.g., system glitches, human input).
  • Continuous Improvement: Iterative fixes (e.g., process changes, user training).

E. Tools & Technology

  • Data Quality Tools: Informatica DQ, IBM InfoSphere, Ataccama, or open-source (Apache Griffin).
  • Metadata Management: Track data lineage and quality scores.
  • AI/ML: Anomaly detection (e.g., identifying drift in datasets).

F. Culture & Training

  • Promote data literacy across teams.
  • Encourage reporting of data issues without blame.

Using Databricks DQX Framework in a Notebook

Step by Step Implementing DQX

Step 1: Install the DQX Library

install it using the Databricks Labs CLI:

%pip install databricks-labs-dqx

# Restart the kernel after the package is installed in the notebook:
# in a separate cell run:
dbutils.library.restartPython()

Step 2: Initialize the Environment and read input data

Set up the necessary environment for running the Databricks DQX framework, including:

Importing the key components from the Databricks DQX library.
  • DQProfiler: Used for profiling the input data to understand its structure and generate summary statistics.
  • DQGenerator: Generates data quality rules based on the profiles.
  • DQEngine: Executes the defined data quality checks.
  • WorkspaceClient: Handles communication with the Databricks workspace.
Import Libraries
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
Loading the input data that you want to evaluate.
# Read the input data from a Delta table
input_df = spark.read.table("catalog.schema.table")
Establishing a connection to the Databricks workspace.
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)
Profiling for data quality.
# Profile the input DataFrame to get summary statistics and data profiles

summary_stats, profiles = profiler.profile(input_df)
Generate DQX quality rules/checks
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)  # with default level "error"

dq_engine = DQEngine(ws)
Save checks in arbitrary workspace location
# save checks in arbitrary workspace location
dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")
Generate DLT expectations
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)

The profiler samples 30% of the data (sample ratio = 0.3) and limits the input to 1000 records by default.

Profiling a Table

Tables can be loaded and profiled using profile_table.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

# Profile a single table directly
ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Profile a specific table with custom options
summary_stats, profiles = profiler.profile_table(
    table="catalog1.schema1.table1",
    columns=["col1", "col2", "col3"],  # specify columns to profile
    options={
        "sample_fraction": 0.1,  # sample 10% of data
        "limit": 500,            # limit to 500 records
        "remove_outliers": True, # enable outlier detection
        "num_sigmas": 2.5       # use 2.5 standard deviations for outliers
    }
)

print("Summary Statistics:", summary_stats)
print("Generated Profiles:", profiles)
Profiling Multiple Tables

The profiler can discover and profile multiple tables in Unity Catalog. Tables can be passed explicitly as a list or be included/excluded using regex patterns.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Profile several tables by name:
results = profiler.profile_tables(
    tables=["main.data.table_001", "main.data.table_002"]
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

# Include tables matching specific patterns
results = profiler.profile_tables(
    patterns=["$main.*", "$data.*"]
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

# Exclude tables matching specific patterns
results = profiler.profile_tables(
    patterns=["$sys.*", ".*_tmp"],
    exclude_matched=True
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

Profiling Options

The profiler supports extensive configuration options to customize the profiling behavior.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

# Custom profiling options
custom_options = {
# Sampling options
"sample_fraction": 0.2, # Sample 20% of the data
"sample_seed": 42, # Seed for reproducible sampling
"limit": 2000, # Limit to 2000 records after sampling

# Outlier detection options
"remove_outliers": True, # Enable outlier detection for min/max rules
"outlier_columns": ["price", "age"], # Only detect outliers in specific columns
"num_sigmas": 2.5, # Use 2.5 standard deviations for outlier detection

# Null value handling
"max_null_ratio": 0.05, # Generate is_not_null rule if <5% nulls

# String handling
"trim_strings": True, # Trim whitespace from strings before analysis
"max_empty_ratio": 0.02, # Generate is_not_null_or_empty if <2% empty strings

# Distinct value analysis
"distinct_ratio": 0.01, # Generate is_in rule if <1% distinct values
"max_in_count": 20, # Maximum items in is_in rule list

# Value rounding
"round": True, # Round min/max values for cleaner rules
}

ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Apply custom options to profiling
summary_stats, profiles = profiler.profile(input_df, options=custom_options)

# Apply custom options when profiling tables
tables = [
"dqx.demo.test_table_001",
"dqx.demo.test_table_002",
"dqx.demo.test_table_003", # profiled with default options
]
table_options = {
"dqx.demo.test_table_001": {"limit": 2000},
"dqx.demo.test_table_002": {"limit": 5000},
}
summary_stats, profiles = profiler.profile_tables(tables=tables, options=table_options)

Understanding output

Assuming the sample data is:

customer_idcustomer_namecustomer_emailis_activestart_dateend_date
1Alicealice@mainri.ca12025-01-24null
2Bobbob_new@mainri.ca12025-01-24null
3Charlieinvalid_email12025-01-24null
3Charlieinvalid_email02025-01-242025-01-24
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)

# Read the input data from a Delta table
input_df = spark.read.table("catalog.schema.table")

# Display a sample of the input data
input_df.display()

# Profile the input DataFrame to get summary statistics and data profiles
summary_stats, profiles = profiler.profile(input_df)

Upon checking the summary and profile of my input data generated, below are the results generated by DQX

print(summary_stats)

Summary of input data on all the columns in input dataframe

# Summary of input data on all the columns in input dataframe
{
  "customer_id": {
    "count": 4,
    "mean": 2.25,
    "stddev": 0.9574271077563381,
    "min": 1,
    "25%": 1,
    "50%": 2,
    "75%": 3,
    "max": 3,
    "count_non_null": 4,
    "count_null": 0
  },
  "customer_name": {
    "count": 4,
    "mean": null,
    "stddev": null,
    "min": "Alice",
    "25%": null,
    "50%": null,
    "75%": null,
    "max": "Charlie",
    "count_non_null": 4,
    "count_null": 0
  },
  "customer_email": {
    "count": 4,
    "mean": null,
    "stddev": null,
    "min": "alice@example.com",
    "25%": null,
    "50%": null,
    "75%": null,
    "max": "charlie@example.com",
    "count_non_null": 4,
    "count_null": 0
  },
  "is_active": {
    "count": 4,
    "mean": 0.75,
    "stddev": 0.5,
    "min": 0,
    "25%": 0,
    "50%": 1,
    "75%": 1,
    "max": 1,
    "count_non_null": 4,
    "count_null": 0
  },
  "start_date": {
    "count": 4,
    "count_non_null": 4,
    "count_null": 0,
    "min": "2025-01-24",
    "max": "2025-01-24",
    "mean": "2025-01-24"
  },
  "end_date": {
    "count": 4,
    "count_non_null": 1,
    "count_null": 3,
    "min": 1737676800,
    "max": 1737676800
  }
}

print(profiles)
# Default Data profile generated based on input data
DQProfile(
  name='is_not_null',
  column='customer_id',
  description=None,
  parameters=None
),
DQProfile(
  name='min_max',
  column='customer_id',
  description='Real min/max values were used',
  parameters={
    'min': 1,
    'max': 3
  }
),
DQProfile(
  name='is_not_null',
  column='customer_name',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='customer_email',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='is_active',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='start_date',
  description=None,
  parameters=None
)

Step 3: Understanding checks applied at data

With the below snippet, we can understand the default checks applied at input data, which generated the data profile as mentioned in previous step.

# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)

print(checks)
# Checks applied on input data
[{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_id'
    }
  },
  'name': 'customer_id_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_in_range',
    'arguments': {
      'col_name': 'customer_id',
      'min_limit': 1,
      'max_limit': 3
    }
  },
  'name': 'customer_id_isnt_in_range',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_name'
    }
  },
  'name': 'customer_name_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_email'
    }
  },
  'name': 'customer_email_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'is_active'
    }
  },
  'name': 'is_active_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'start_date'
    }
  },
  'name': 'start_date_is_null',
  'criticality': 'error'
}]

Step 4: Define custom Data Quality Expectations

In addition to the automatically generated checks, you can define your own custom rules to enforce business-specific data quality requirements. This is particularly useful when your organization has unique validation criteria that aren’t covered by the default checks. By using a configuration-driven approach (e.g., YAML), you can easily maintain and update these rules without modifying your pipeline code.

For example, you might want to enforce that:

  • Customer IDs must not be null or empty.
  • Email addresses must match a specific domain format (e.x: @example.com).
# Define custom data quality expectations.
import yaml

checks_custom = yaml.safe_load("""
- check:
    arguments:
        col_name: customer_id
    function: is_not_null_and_not_empty
    criticality: error
    name: customer_id_is_null
- check:
    arguments:
        col_name: customer_email
        regex: '^[A-Za-z0-9._%+-]+@example\.com$'
    function: regex_match
    criticality: error
    name: customer_emaild_is_not_valid""")
# Validate the custom data quality checks
status = DQEngine.validate_checks(checks_custom)

# The above variable for the custom config yaml file can also be pased from workspace file path as given below:
status = DQEngine.validate_checks("path to yaml file in workspace")

# Assert that there are no errors in the validation status
assert not status.has_errors

Step 5: Applying the custom rules and generating results

Once your custom data quality rules have been defined and validated, the next step is to apply them to your input data. The DQEngine facilitates this by splitting your dataset into two categories:

  • Silver Data: Records that meet all quality expectations.
  • Quarantined Data: Records that fail one or more quality checks.

This approach allows you to separate valid and invalid data for further inspection and remediation. The valid records can proceed to downstream processes, while the quarantined records can be analyzed to determine the cause of failure (e.g., missing values, incorrect formats).

Here’s how you can apply the rules and generate the results:

# Create a DQEngine instance with the WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())

# Apply quality checks and split the DataFrame into silver and quarantine DataFrames
silver_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df_1, checks_custom)
Quarantined data – Not matching the rules

Summary

In essence, data quality is no longer just an IT concern; it’s a fundamental business imperative. In today’s complex and competitive landscape, the success of an organization hinges on its ability to leverage high-quality, trusted data for every strategic and operational decision.

A Data Quality Framework (DQX) helps organizations:

  • Establish clear quality standards
  • Implement automated checks
  • Track and resolve issues
  • Ensure trust in data

https://databrickslabs.github.io/dqx/docs/motivation

https://medium.com/@nivethanvenkat28/revolutionizing-data-quality-checks-using-databricks-dqx-f2a49d83c3c6

Leveraging Microsoft Graph API for SharePoint Manipulation with Azure Data Factory or Synapse Analytics

This article will discuss a new approach for Azure Data Factory (ADF) or Synapse Analytics (ASA) to leverage the Microsoft Graph API for accessing and integrating with various Microsoft 365 services and data. Examples include ADF downloading files from SharePoint (SPO) to Azure Data Lake Storage (ADLS), creating folders in SharePoint libraries, and moving files between SharePoint folders.

We’ve previously discussed using Azure Data Factory (ADF) or Synapse Analytics (ASA) to download files from SharePoint to Azure Data Lake Storage (ADLS) Gen2. Specifically, we explored a metadata-driven approach for incrementally copying data from SharePoint Online to ADLS Gen2.

I recently received reports indicating that our previous method for downloading files from their SharePoint Online (SPO) environment is no longer working. Upon investigation, I confirmed that changes to the configuration of some SharePoint sites prevent the standard download solution from functioning.

What is Microsoft Graph API.

The Microsoft Graph API is a unified RESTful web API provided by Microsoft that allows developers to access and integrate with a wide range of Microsoft 365 services and data. This includes data from:

  • Azure Active Directory (Entra ID)
  • Outlook (Mail, Calendar, Contacts)
  • OneDrive and SharePoint
  • Teams
  • Excel
  • Planner
  • Intune
  • To Do, and many others.

Scenario

At Mainri Corporation, colleagues upload files to a designated folder on their SharePoint site. As part of their data centralization process, files from a shared SharePoint Online folder named “Current” are copied to ADLS. Once the copy is successful, these files are then relocated from the “Current” folder to an “Archive” folder within the same SPO Library.

For this purpose, let’s utilize the mainri SharePoint Online (SPO) site, ‘IT-BA-site’ (also known as ‘IT Business Partners’), along with its dummy library and folders. The library’s name is ‘Finance’.

There are multiple folders under the Finance Library, colleagues upload file to:
Finance/Business Requests/AR Aging Report/Current.
The Archive folder is: Finance/Business Requests/AR Aging Report/Archive .

Prerequisites:

An Azure AD Application (AAD) Registration with Microsoft Graph API permissions.

Because SharePoint is a protected Microsoft 365 service, ADF cannot access it directly. So you:

  1. Register an AAD App
  2. Assign it permission to read SharePoint files (Sites.Read.All, Files.Read.All)
  3. Use the AAD App credentials (client ID, secret, tenant ID) to obtain an access token
  4. Pass that token to Microsoft Graph API from ADF pipelines (using Web Activity + HTTP Binary Dataset)

Register an Azure Active Directory Application (AAD App) in Azure

  1. Go to Azure Portal > Azure Active Directory > App registrations.
  2. Click “New registration”.
    • Name: ADF-GraphAPI-App
    • Supported account types: Single tenant.
  3. Click Register.

we want to get:

  • Client ID: Unique ID of your app — used in all API calls
  • Tenant ID: Your Azure AD directory ID
  • Client Secret: Password-like value — proves app identity
  • Permissions: Defines what APIs the app is allowed to access

Grant Graph API Permissions

Go to the API permissions tab.

Click “Add a permission” > Microsoft Graph > Application permissions.

Add these (at minimum):

  • Sites.Read.All – to read SharePoint site content.
  • Files.Read.All – to read files in document libraries.

Click “Grant admin consent” to enable these permissions.

Solution

The ADF major steps and activities are:

Register an Azure AD Application (if not using Managed Identity), Grant the application the necessary Microsoft Graph API permissions, specifically Sites.Selected.

Enable Managed Identity for your ADF (Recommended), Grant the ADF’s managed identity the necessary Microsoft Graph API permissions, specifically Sites.Selected. Enable Managed Identity for your ADF (Recommended), Grant the ADF’s managed identity the necessary Microsoft Graph API permissions, specifically Sites.Selected.

Create a HTTP Linked Service in ADF,

Base URL:
https://graph.microsoft.com/v1.0
Web Activity to get an access token
URL: https://login.microsoftonline.com/<your_tenant_id>/oauth2/token
Method: POST 

Body: (for Service Principal authentication)
JSON
{
 "grant_type": "client_credentials",
 "client_id": "<your_application_id>",
 "client_secret": "<your_client_secret>",
 "resource": "https://graph.microsoft.com"
}

Authentication: None Headers:Content-Type: application/x-www-form-urlencoded
Web Activity to get the Site ID
URL:https://graph.microsoft.com/v1.0/sites/<your_sharepoint_domain>:/sites/<your_site_relative_path>(e.g., https://mainri.sharepoint.com:/sites/finance) 

Method: GET 

Authentication: none

header: "Authorization" 
@concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).

Web Activity to list the drives (document libraries)

URL: @concat('https://graph.microsoft.com/v1.0/sites/', activity('<your_get_site_id_activity_name>').output.id, '/drives')
Method: GET

Authentication: none

header: "Authorization" 
@concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).

Web Activity (or ForEach Activity with a nested Web Activity) to list the items (files and folders) in a specific drive/folder:

URL: @concat('https://graph.microsoft.com/v1.0/drives/', '<your_drive_id>', '/items/<your_folder_id>/children') (You might need to iterate through folders recursively if you have nested structures).

Method: GET

Authentication: none

header: "Authorization" 
@concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).

Copy Activity to download the file content

Source: HTTP Binary
Relative URL: @item()['@microsoft.graph.downloadUrl']
method: GET

Sink: Configure a sink to your desired destination (e.g., Azure Blob Storage, Azure Data Lake Storage). Choose a suitable format (Binary for files as-is).

Finally, Web Activity move processed file to Archive area.

URL: https://graph.microsoft.com/v1.0/drives/<your_drive_id>/items/<your_file_item_id>

Method; PATCH

Body:
@json(
concat(
' { "parentRefence": {'
, ' "id":" '
, activity('Create_rundate_folder_under_archive').output.id
, ' "}}'
))

Step 1: Get Client Secret from Azure Key Vault

Web activity
purpose: get client secret that saves in Azure Key Vault.

Url: @{concat(
         pipeline().parameters.pl_par_KeyVault_URL
        , '/secrets/'
        , pipeline().parameters.pl_par_keyVault_SecretsName
        , '?api-version=7.0'
)}

Method: Get

Step 2: Get SPO access Bearer Token

Web activity
purpose: all Graph API have to use the Bearer access token

URL: 
@concat(
    'https://login.microsoftonline.com/'
    , pipeline().parameters.pl_par_Tenant_id
    , '/oauth2/v2.0/token'
)

Method: POST

Body: 
@concat(
  'client_id=', pipeline().parameters.pl_par_Client_id,
  '&client_secret=', activity('Get_Client_Secret').output.value,
  '&grant_type=client_credentials',
  '&scope=https://graph.microsoft.com/.default'
)

Header: Content-Type : application/x-www-form-urlencoded

The response has expires_in and ext_expires_in

expires_in: This tells you how many seconds the access token is valid for — in your case, 3599 seconds (which is just under 1 hour). After this time, the token will expire and cannot be used to call the Graph API anymore.

ext_expires_in: This is the extended expiry time. It represents how long the token can still be accepted by some Microsoft services (under specific circumstances) after it technically expires. This allows some apps to use the token slightly longer depending on how token caching and refresh policies are handled.

For production apps, you should always implement token refresh using the refresh token before expires_in hits zero.

Save the token in a variable, as we will use it in subsequent activities.

Set variable activity
Purpose: for following activities conveniences, save it in a variable.

@activity('GetBearerToken').output.access_token

Step 3: Get SPO site ID via Graph API by use Bearer Token

Sometime your SharePoint / MS 365 administrator may give you SiteID. If you do not have it, you can do this way to get it.

Web activity
purpose: We will first obtain the “site ID,” a key value used for all subsequent API calls.

URL:
@concat('https://graph.microsoft.com/v1.0/sites/'
, pipeline().parameters.pl_par_Tenant_name
,'.sharepoint.com:/sites/'
, pipeline().parameters.pl_par_SPO_site
)

Method: GET

Header: 
Authorization
@concat('Bearer ',activity('GetBearerToken').output.access_token)

output segments:
{ ….
“id”: “mainri.sharepoint.com,73751907-6e2b-4228-abcd-3c321f3e00ec,bee66b00-7233-9876-5432-2f0c059ec551″,
….}

from the output, we can see that ID has 3 partitions. the entire 3-part string is the site ID. It is a composite of:

  • hostname (e.g., mainri.sharepoint.com)
  • site collection ID (a GUID)
  • site ID (another GUID)

Step 4: Get SPO Full Drivers list via Graph API by use Bearer Token

“Driver” also known as Library.

Web activity
purpose: Since there are multiple Drivers/Libraries in the SPO, now we list out all Drivers/Libraries; find out the one that we are interested in – Finance.

Url:
 @concat('https://graph.microsoft.com/v1.0/sites/'
, activity('GetSiteId').output.id
, '/drives')

Method: GET

Header: Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

Step 5: Filter out our interested Library “Finance” via Graph API by use Bearer Token

Filter Activity
Purpose: Find out the Finance’s ID.
Since there are multiple drivers/Libraries in the SPO, we are interested in the “Finance” only

Items:@activity('GetFullDrives').output.value
Condition: @equals(item().name, 'Finance')

Output Segments

Save the DriverID/LibraryID in a variable.

We will use the LibraryID/DriverID in the following activities, so save it for convenience.

Step 6: Use the Bearer Token with the Graph API to retrieve a list of subfolders in the Finance library’s root.

Web Activity
Purpose: Navigate to the ‘Business Request’ folder in the Finance library’s sub-folders and retrieve its ID.

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/root/children')

Method: GET

Header: Authorization 
@concat('Bearer ',activity('GetBearerToken').output.access_token)

Output segment

Step 7: Find the ‘Business Request’ sub-folder under ‘Finance’.

Filter activity
Purpose: Find out folder “Business Request” ID

Items: @activity('Get_FinanceRoot').output.value
Condition: @equals(item().name, 'Business Requests')

output segments

Step 8: Get “Business Request” child items via Graph API

Check the sub-folders within ‘Business Request’ and identify the one titled ‘AR Aging Report’, as that is our focus

Web Activity
Purpose: Get ‘Business Request’ child items using Graph API.

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
, activity('FilterBusinessRequest').output.value[0].id
, '/children'
)

Method: GET

Header:
Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

output segments

Step 9: Filter out sub-folder “AR Aging Report” from Business Request via Graph API

Filter Activity
Purpose: Maybe, there are multiple child items under this folder. We are interested in the “AR Aging Report” and its ID only.

Items: @activity('GetBusinessRequests_Children').output.value
Condition: @equals(item().name, 'AR Aging Report')

output segment

Step 10: Get “AR Aging Report” child items list

Web Activity
Purpose: We require the folder IDs for the “Current” folder (where colleagues upload files) and the “Archive” folder (for saving processed data), as indicated by the business, to continue.
— “Current” for colleagues uploading files to here
— “Archive” for saving processed files

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
, activity('Filter_AR_Aging_Report').output.value[0].id
, '/children'
)

Method: GET

Header:
Authorization : @concat('Bearer ',activity('GetBearerToken').output.access_token)

output segments

Step 11: Find the ‘Current’ folder under ‘AR Aging Report’

Filter Activity
Purpose: Find out the “Current” folder that saves uploaded files

Items: @activity('Get AR Aging Children').output.value
Condition: @equals(item().name, 'Current')

output segments

Save the Current folder ID in variable

for following activities convenience, save the “Current” folder ID in a variable.

Value: @activity('Filter_Current').output.value[0].id

Step 12: Obtain a list of all files within the ‘Current’ folder.

Web Activity
Purpose: Retrieve a list of files from the ‘Current’ folder to be copied to ADLS.

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
, activity('Filter_Current').output.value[0].id
, '/children'
)

Method: GET

Header: Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

Step 13: Check if today’s folder exists.

If condition Activity
Purpose: To check if a folder for today’s date (2025-05-10) already exists and determine if a new folder needs to be created for today’s archiving.

Expression:
@greater(
    length(activity('Get_Current_Files').output.value) 
    ,1
)

Within the IF-Condition activity “True” activity, there more actions we take.

Since new files are in ‘Current’, we will now COPY and ARCHIVE them.

  • Find out “Archive” folder, then get the Archive folder ID
  • List out all child items under “Archive” folder
  • Run ‘pl_child_L1_chk_rundate_today_exists’ to see if ‘rundate=2025-05-12’ exists. If so, skip creation; if not, create ‘rundate=2025-05-12’ under ‘Archive’..
  • Get all items in ‘Archive’ and identify the ID of the ‘rundate=2025-05-13’ folder.
  • Then, run ‘pl_child_L1_copy_archive’ to transfer SPO data to ADLS and archive ‘current’ to ‘Archive/rundate=2025-05-12’.

Begin implementing the actions outlined above.

We begin the process by checking the ‘Current’ folder to identify any files for copying. It’s important to note that this folder might not contain any files at this time.

Step 14: Inside the ‘chk-has-files’ IF-Condition activity, find the ID of ‘Archive’ using a filter.

Filter activity
Purpose: Find “Archive” ID

Items: @activity('Get AR Aging Children').output.value
Condition: @equals(item().name, 'Archive')

Output segments

Step 15: Get Archive’s child items list

Web Activity
Purposes: Check the sub-folders in ‘Archive’ for ‘rundate=2025-05-13‘ to determine if creation is needed.

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
, activity('Filter_Archive').output.value[0].id
, '/children'
)

Method: GET

Header:
Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

Above steps, we have successfully find out all we need:

  • SiteID
  • Driver/Library ID
  • sub-folder(s) ID

We will now create a sub-folder under “archive” with the following naming pattern:
rundate=2024-09-28

“Archive” folder looks:
../Archive/rundate=2024-09-28
…..
../Archive/rundate=2024-11-30
….
etc.

Processed files are archived in sub-folders named by their processing date.
../Archive/rundate=2024-09-28/file1.csv
../Archive/rundate=2024-09-28/file2.xlsx

etc.

The Microsoft Graph API will return an error if we attempt to create a folder that already exists. Therefore, we must first verify the folder’s existence before attempting to create it.

As part of today’s data ingestion from SPO to ADLS (May 13, 2025), we need to determine if an archive sub-folder for today’s date already exists. We achieve this by listing the contents of the ‘Archive’ folder and checking for a sub-folder named ‘rundate=2025-05-13‘. If this sub-folder is found, we proceed with the next steps. If it’s not found, we will create a new sub-folder named ‘rundate=2025-05-13‘ within the ‘Archive’ location.

Step 16: Identify the folder named ‘rundate=<<Today Date>>’

Filter activity
Purpose: Verifying the existence of ‘rundate=2025-05-10‘ to determine if today’s archive folder needs creation.

Items: @activity('Get Archive Children').output.value
Condition: @equals(item().name, 
concat(
    'rundate='
    , formatDateTime(convertTimeZone(utcNow(),'UTC' ,'Eastern Standard Time' ),'yyyy-MM-dd')
    )
)

Output segments

(we can see the today’s does not exist.)

Within the IF-Condition activity, if the check for today’s folder returns false (meaning it doesn’t exist), we will proceed to create a new folder named ‘rundate=2025-05-10‘.

Step 17: Execute Child pipeline pl_child_L1_chk_rundate_today_exists

Execute Activity
Purposes: check if rundate=<<Today date>> sub-folder exists or not. If it does not exists, create.

Pass parameters to Child Pipeline:
> child_para_Filter_Archive_rundate_today   ARRAY   @activity('Get Archive Children').output.value
> child_para_bearer_token    string   @variables('var_Bearer_Token')
> child_para_rundate    string   @variables('var_rundate')
> child_para_FinanceID   string   @variables('var_financeID')
> child_para_archiveID   string   @variables('var_ArchiveID')

Step 18: Check if “rundate=<<Today date>>” exists or not

In the Child Pipeline: pl_child_L1_chk_rundate_today_exists

If-Condition Activity
Purpose: Verify the existence of the ‘rundate=<<Today Date>>‘ sub-folder; create it if it doesn’t exist.

Child pipeline parameters:
> child_para_Filter_Archive_rundate_today    ARRAY 
> child_para_bearer_token   string
> child_para_rundate   string
> child_para_FinanceID   string
> child_para_archiveID   string

Step 19: Create a new folder in “Archive” via Graph API

Inside IF-Condition activity, TRUE

Web Activity
Purpose: create a new folder name it rundate=<today date>
e.g. rundate=2024-05-10

URL:
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
,  variables('var_ArchiveID')
, '/children'
)

Method: POST

Body:
@json(
concat(
    '{"name": "', variables('var_rundate'), '",  "folder":{} }'
))

Header:
Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

Content-Type: application/json

output segments

After creating the new folder, we will extract its ID. This ID will then be stored in a variable for use during the “Archiving” process.

Step 20: Get “Archive” folder child Items Again

As the ‘rundate=2024-05-12‘ folder could have been created either in the current pipeline run (during a previous step) or in an earlier execution today, we are re-retrieving the child items of the ‘Archive’ folder to ensure we have the most up-to-date ID.

In the Main pipeline IF-Condition TRUE active.

Web Activity

URL: 
@concat('https://graph.microsoft.com/v1.0/drives/'
, activity('FilterFinance').output.value[0].id
, '/items/'
, activity('Filter_Archive').output.value[0].id
, '/children'
)

Method: GET

Header:
Authorization:  @concat('Bearer ',activity('GetBearerToken').output.access_token)

output similar to Step 15: get Archive’s child items list

Step 21: Find the ‘rundate=2024-05-12‘ folder in Archive and get its ID agin.

Within main pipeline, IF-Condition TRUE active.

Filter Activity
Purpose: Retrieve the ID of the sub-folder named “rundate=2024-05-12“.

Items: @activity('Get Archive Children').output.value
Condition: @equals(item().name, 
variables('var_rundate')
)

output similar to Step 16: Filter out rundate today folder

Save rundate=<<Today Date>> ID in a variable

Next, we retrieve the files from the ‘Current’ SPO folder for copying to ADLS.

Step 22: Execute child pipeline pl_child_L1_copy_archive

In the Main pipeline, if-Condition TRUE activity,

Execute Activity
Purpose: Implement the process to copy files from SPO to ADLS and archive processed files from the “Current” folder in SPO to the “Archive” folder in SPO.

Pass parameters to child pipeline pl_child_L1_copy_archive:
> child_para_tar_path   string   @pipeline().parameters.pl_tar_path
> child_para_version   string   @pipeline().parameters.pl_par_version
> child_para_item   string   @variables('var_current_files_list')
> child_para_Bearer_Token   string   @variables('var_Bearer_Token')
> child_para_rundateTodayID   string   @variables('var_rundate_today_FolderID')
> child_para_FinanceID   string   @variables('var_financeID')

Execute child pipeline pl_child_L1_copy_archive

In the child pipeline pl_child_L1_copy_archive

ForEach activity
Purpose: Iterate through each item in the file list.

Items: @pipeline().parameters.child_para_item

The ForEach loop will iterate through each file, performing the following actions: copying the file to Azure Data Lake Storage and subsequently moving the processed file to the ‘Archive’ folder.

Step 23: Copy file to ADLS

Copy activity
purpose: copy file to ADLS one by one

Linked Service:

HTTP linked service, use Anonymous.

Source DataSet:

HTTP Binary dataset,

Sink Dataset

Azure Data Lake Storage Gen 2, Binary

Linked Services

Azure Data Lake Storage Gen 2

Sink dataset parameters:
ds_par_dst_path   string
ds_par_dst_fname   string

Step 24: Move processed file to “Archive” Folder

In the child pipeline pl_child_L1_copy_archive

Web Activity
Prupose: Move processed file from “Currrent” folder to “Archive” area.

URL:
@concat(
    'https://graph.microsoft.com/v1.0/drives/'
    , pipeline().parameters.child_para_FinanceID
    ,'/items/'
    , item().id
)

Method: PATCH

Body:
@json(
concat(
'{ "parentReference": {'
    , '"id": "'
    , activity('Create_rundate_folder_under_archive').output.id
    , '"}}'
))

Headers:
Authorization: @concat('Bearer ',activity('GetBearerToken').output.access_token)

Summary

The pipeline includes checks for folder existence to avoid redundant creation, especially in scenarios where the pipeline might be re-run. The IDs of relevant folders (“Archive” and the date-specific sub-folders) are retrieved dynamically throughout the process using filtering and list operations.

In essence, this pipeline automates the process of taking newly uploaded files, transferring them to ADLS, and organizing them within an archive structure based on the date of processing.

  1. Register an Azure AD Application (if not using Managed Identity), Grant the application the necessary Microsoft Graph API permissions, specifically Sites.Selected.
  2. Enable Managed Identity for your ADF (Recommended), Grant the ADF’s managed identity the necessary Microsoft Graph API permissions, specifically Sites.Selected. Enable Managed Identity for your ADF (Recommended), Grant the ADF’s managed identity the necessary Microsoft Graph API permissions, specifically Sites.Selected.
  3. Create a HTTP Linked Service in ADF, Base URL: https://graph.microsoft.com/v1.0
  4. Web Activity to get an access token
    URL:https://login.microsoftonline.com/<your_tenant_id>/oauth2/tokenMethod: POST
    Body: (for Service Principal authentication)
    JSON
    { “grant_type”: “client_credentials”,
    “client_id”: “<your_application_id>”,
    “client_secret”: “<your_client_secret>”,
    “resource”: “https://graph.microsoft.com”
    }
    Authentication: None
    Headers: Content-Type: application/x-www-form-urlencoded
  5. get the Site ID
    Web Activity to get the Site ID
    URL:https://graph.microsoft.com/v1.0/sites/<your_sharepoint_domain>:/sites/<your_site_relative_path>
    (e.g., https://mainri.sharepoint.com:/sites/finance)
    Method: GET
    Authentication: none
    header: “Authorization”
    @concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).
  6. Web Activity to list the drives (document libraries)
    URL: @concat('https://graph.microsoft.com/v1.0/sites/', activity('<your_get_site_id_activity_name>').output.id, '/drives')
    Method: GET
    Authentication: none
    header: “Authorization”
    @concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).
  7. Web Activity (or ForEach Activity with a nested Web Activity) to list the items (files and folders) in a specific drive/folder:
    URL: @concat('https://graph.microsoft.com/v1.0/drives/', '<your_drive_id>', '/items/<your_folder_id>/children') (You might need to iterate through folders recursively if you have nested structures).
    Method: GET
    Authentication: none
    header: “Authorization”
    @concat('Bearer ', activity('<your_get_token_activity_name>').output.access_token).
  8. Copy Activity to download the file content
    Source: HTTP Binary
    Relative URL: @item()['@microsoft.graph.downloadUrl']
    method: GET
    Sink: Configure a sink to your desired destination (e.g., Azure Blob Storage, Azure Data Lake Storage). Choose a suitable format (Binary for files as-is).
  9. Finally, Web Activity move processed file to Archive area.
    URl: https://graph.microsoft.com/v1.0/drives/<your_drive_id>/items/<your_file_item_id>
    Method; PATCH
    Body:
    @json(concat(
    ‘ { “parentRefence”: {‘, ‘ “id”:” ‘
    , activity(‘Create_rundate_folder_under_archive’).output.id, ‘ “}}’))

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

What is Service Principal ID, Application ID, Client ID, Tenant ID

Service Principal ID

What is Service Principal?

A Service Principal is a security identity in Azure Active Directory (Entra ID) that represents an application or service, allowing it to log in and access Azure resources — just like a user, but for apps or automation.

What is a Service Principal ID?

The Service Principal ID is the unique identifier (GUID) assigned to the Service Principal object in Azure Active Directory (Entra ID).

Application ID

Also known as: App ID

What it is Application ID:

A globally unique identifier for the Azure AD Application registration.

Scope: Refers to the actual application definition in Azure AD.

Example use: When configuring authentication for apps (e.g., OAuth2, OpenID), you often use the Application ID.

Client ID

Also known as: App ID (again!)

What it is Client ID

Client ID: This is actually the same as the Application ID in most contexts.

Why it’s called “Client ID”: In OAuth2 terminology, the application (a “client”) gets a Client ID and Client Secret.

Example use: When an app authenticates using OAuth2, it presents the Client ID and secret.

Quick Comparison Table:

TermAliasRefers ToExample Use
Application IDClient IDApp registration in Azure ADApp registration, API authentication
Client IDApplication IDOAuth2 client ID for authenticationLogin with Azure AD
Service Principal IDObject IDAzure AD identity for app in a tenantAssign RBAC roles, permissions

Key Properties

PropertyDescription
Client IDThe App (Application) ID of the Service Principal
Tenant IDThe Azure AD tenant where the identity resides
Object IDThe unique ID of the Service Principal itself
Client SecretPassword-like credential used for authentication

Breakdown of IDs:

So: Client ID == Application ID

NameAlso Called AsWhat it Represents
Application IDApp ID / Client IDThe registered app in Azure AD
Service Principal IDObject IDThe specific identity of the app in the tenant
Tenant IDThe Azure AD directory (organization)

Unity Catalog – Table Type Comparison

In Azure Databricks Unity Catalog, you can create different types of tables depending on your storage and management needs. The main table types are including Managed TablesExternal TablesDelta TablesForeign TablesStreaming TablesLive Tables (deprecated)Feature Tables, and Hive Tables (legacy). Each table type is explained in detail, and a side-by-side comparison is provided for clarity.

Side-by-Side Comparison Table

FeatureManaged TablesExternal TablesDelta TablesForeign TablesStreaming TablesDelta Live Tables (DLT)Feature TablesHive Tables (Legacy)
StorageDatabricks-managedExternal storageManaged/ExternalExternal databaseDatabricks-managedDatabricks-managedManaged/ExternalManaged/External
LocationInternal Delta LakeSpecified external pathInternal/External Delta LakeExternal metastore (Snowflake, BigQuery)Internal Delta LakeInternal Delta LakeInternal/External Delta LakeInternal/External storage
OwnershipDatabricksUserDatabricks/UserExternal providerDatabricksDatabricksDatabricks/UserDatabricks (Legacy Hive Metastore)
Deletion ImpactDeletes data & metadataDeletes only metadataDepends (Managed: Deletes, External: Keeps data)Deletes only metadata referenceDeletes data & metadataDeletes data & metadataDeletes metadata (but not feature versions)Similar to Managed/External
FormatDelta LakeParquet, CSV, JSON, DeltaDelta LakeSnowflake, BigQuery, Redshift, etc.Delta LakeDelta LakeDelta LakeParquet, ORC, Avro, CSV
Use CaseFull lifecycle managementSharing with external toolsAdvanced data versioning & ACID complianceQuerying external DBsContinuous data updatesETL PipelinesML feature storageLegacy storage (pre-Unity Catalog)

Table type and describes

1. Managed Tables

Managed tables are tables where both the metadata and the data are managed by Unity Catalog. When you create a managed table, the data is stored in the default storage location associated with the catalog or schema.

Data Storage and location:

Unity Catalog manages both the metadata and the underlying data in a Databricks-managed location

The data is stored in a Unity Catalog-managed storage location. Typically in an internal Delta Lake storage, e.g., DBFS or Azure Data Lake Storage

Use Case:

Ideal for Databricks-centric workflows where you want Databricks to handle storage and metadata.

Pros & Cons:

Pros: Easy to manage, no need to worry about storage locations.

Cons: Data is tied to Databricks, making it harder to share externally.

Example:

CREATE TABLE managed_table (
    id INT,
    name STRING
);

INSERT INTO managed_table VALUES (1, 'Alice');

SELECT * FROM managed_table;

2. External Tables

External tables store metadata in Unity Catalog but keep data in an external storage location (e.g., Azure Blob Storage, ADLS, S3).

Data storage and Location:

The metadata is managed by Unity Catalog, but the actual data remains in external storage (like Azure Data Lake Storage Gen2 or an S3 bucket).

You must specify an explicit storage location, e.g., Azure Blob Storage, ADLS, S3).

Use Case:

Ideal for cross-platform data sharing or when data is managed outside Databricks.

Pros and Cons

Pros: Data is decoupled from Databricks, making it easier to share.

Cons: Requires manual management of external storage and permissions.

Preparing create external table

Before you can create an external table, you must create a storage credential that allows Unity Catalog to read from and write to the path on your cloud tenant, and an external location that references it.

Requirements
  • In Azure, create a service principal and grant it the Azure Blob Contributor role on your storage container.
  • In Azure, create a client secret for your service principal. Make a note of the client secret, the directory ID, and the application ID for the client secret.
step 1: Create a storage credential

You can create a storage credential using the Catalog Explorer or the Unity Catalog CLI. Follow these steps to create a storage credential using Catalog Explorer.

  1. In a new browser tab, log in to Databricks.
  2. Click Catalog.
  3. Click Storage Credentials.
  4. Click Create Credential.
  5. Enter example_credential for he name of the storage credential.
  6. Set Client SecretDirectory ID, and Application ID to the values for your service principal.
  7. Optionally enter a comment for the storage credential.
  8. Click Save.
    Leave this browser open for the next steps.
Create an external location

An external location references a storage credential and also contains a storage path on your cloud tenant. The external location allows reading from and writing to only that path and its child directories. You can create an external location from Catalog Explorer, a SQL command, or the Unity Catalog CLI. Follow these steps to create an external location using Catalog Explorer.

  1. Go to the browser tab where you just created a storage credential.
  2. Click Catalog.
  3. Click External Locations.
  4. Click Create location.
  5. Enter example_location for the name of the external location.
  6. Enter the storage container path for the location allows reading from or writing to.
  7. Set Storage Credential to example_credential to the storage credential you just created.
  8. Optionally enter a comment for the external location.
  9. Click Save.
-- Grant access to create tables in the external location
GRANT USE CATALOG
ON example_catalog
TO `all users`;
 
GRANT USE SCHEMA
ON example_catalog.example_schema
TO `all users`;
 
GRANT CREATE EXTERNAL TABLE
ON LOCATION example_location
TO `all users`;
-- Create an example catalog and schema to contain the new table
CREATE CATALOG IF NOT EXISTS example_catalog;
USE CATALOG example_catalog;
CREATE SCHEMA IF NOT EXISTS example_schema;
USE example_schema;
-- Create a new external Unity Catalog table from an existing table
-- Replace <bucket_path> with the storage location where the table will be created
CREATE TABLE IF NOT EXISTS trips_external
LOCATION 'abfss://<bucket_path>'
AS SELECT * from samples.nyctaxi.trips;
 
-- To use a storage credential directly, add 'WITH (CREDENTIAL <credential_name>)' to the SQL statement.

There are some useful Microsoft document to be refer:

Create an external table in Unity Catalog
Configure a managed identity for Unity Catalog
Create a Unity Catalog metastore
Manage access to external cloud services using service credentials
Create a storage credential for connecting to Azure Data Lake Storage Gen2
External locations

Example



CREATE TABLE external_table (
    id INT,
    name STRING
)
LOCATION 'abfss://container@storageaccount.dfs.core.windows.net/path/to/data';

INSERT INTO external_table VALUES (1, 'Bob');

SELECT * FROM external_table;

3. Foreign Tables

Foreign tables reference data stored in external systems (e.g., Snowflake, Redshift) without copying the data into Databricks.

Data Storage and Location

The metadata is stored in Unity Catalog, but the data resides in another metastore (e.g., an external data warehouse like Snowflake or BigQuery).

It does not point to raw files but to an external system.

Use Case:

Best for querying external databases like Snowflake, BigQuery, Redshift without moving data.

Pros and Cons

Pros: No data duplication, seamless integration with external systems.

Cond: Performance depends on the external system’s capabilities.

Example

CREATE FOREIGN TABLE foreign_table
USING com.databricks.spark.snowflake
OPTIONS (
    sfUrl 'snowflake-account-url',
    sfUser 'user',
    sfPassword 'password',
    sfDatabase 'database',
    sfSchema 'schema',
    dbtable 'table'
);

SELECT * FROM foreign_table;

4. Delta Tables

Delta tables use the Delta Lake format, providing ACID transactions, scalable metadata handling, and data versioning.

Data Storage and Location

A special type of managed or external table that uses Delta Lake format.

Can be in managed storage or external storage.

Use Case:

Ideal for reliable, versioned data pipelines.

Pros and Cons

Pros: ACID compliance, time travel, schema enforcement, efficient upserts/deletes.

Cons: Slightly more complex due to Delta Lake features.

Example

CREATE TABLE delta_table (
    id INT,
    name STRING
)
USING DELTA
LOCATION 'abfss://container@storageaccount.dfs.core.windows.net/path/to/delta';

INSERT INTO delta_table VALUES (1, 'Charlie');

SELECT * FROM delta_table;

-- Time travel example
SELECT * FROM delta_table VERSION AS OF 1;

5. Feature Tables

Feature tables are used in machine learning workflows to store and manage feature data for training and inference.

Data Storage and Location

Used for machine learning (ML) feature storage with Databricks Feature Store.

Can be managed or external.

Use Case:

Ideal for managing and sharing features across ML models and teams.

Pros and Cons:

Pros: Centralized feature management, versioning, and lineage tracking.

Pros: Centralized feature management, versioning, and lineage tracking.

Example:

from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
fs.create_table(
    name="feature_table",
    primary_keys=["id"],
    schema="id INT, feature1 FLOAT, feature2 FLOAT",
    description="Example feature table"
)

fs.write_table("feature_table", df, mode="overwrite")
features = fs.read_table("feature_table")

6. Streaming Tables

Streaming tables are designed for real-time data ingestion and processing using Structured Streaming.

Data Location:

Can be stored in managed or external storage.

Use Case:

Ideal for real-time data pipelines and streaming analytics.

Pros and Cons

Pros: Supports real-time data processing, integrates with Delta Lake for reliability.

Cons: Requires understanding of streaming concepts and infrastructure.

Example:

CREATE TABLE streaming_table (
    id INT,
    name STRING
)
USING DELTA;

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
streaming_df = spark.readStream.format("delta").load("/path/to/delta")
streaming_df.writeStream.format("delta").outputMode("append").start("/path/to/streaming_table")

Delta Live Tables (DLT)

Delta Live Tables (DLT) is the modern replacement for Live Tables. It is a framework for building reliable, maintainable, and scalable ETL pipelines using Delta Lake. DLT automatically handles dependencies, orchestration, and error recovery.

Data storage and Location:

Data is stored in Delta Lake format, either in managed or external storage.

Use Case:

Building production-grade ETL pipelines for batch and streaming data.

  • DLT pipelines are defined using Python or SQL.
  • Tables are automatically materialized and can be queried like any other Delta table.

Pros and Cons

  • Declarative pipeline definition.
  • Automatic dependency management.
  • Built-in data quality checks and error handling.
  • Supports both batch and streaming workloads.

Cons: Requires understanding of Delta Lake and ETL concepts.

Example

import dlt

@dlt.table
def live_table():
    return spark.read.format("delta").load("/path/to/source_table")

8. Hive Tables (Legacy)

Hive tables are legacy tables that use the Apache Hive format. They are supported for backward compatibility.

Data storage Location:

Can be stored in managed or external storage.

Use Case:

Legacy systems or migration projects.

Pros and Cons

  • Pros: Backward compatibility with older systems.
  • Cons: Lacks modern features like ACID transactions and time travel.

Example

CREATE TABLE hive_table (
    id INT,
    name STRING
)
STORED AS PARQUET;

INSERT INTO hive_table VALUES (1, 'Dave');
SELECT * FROM hive_table;

Final Thoughts

Use Delta Live Tables for automated ETL pipelines.

Use Feature Tables for machine learning models.

Use Foreign Tables for querying external databases.

Avoid Hive Tables unless working with legacy systems.

Summary

  • Managed Tables: Fully managed by Databricks, ideal for internal workflows.
  • External Tables: Metadata managed by Databricks, data stored externally, ideal for cross-platform sharing.
  • Delta Tables: Advanced features like ACID transactions and time travel, ideal for reliable pipelines.
  • Foreign Tables: Query external systems without data duplication.
  • Streaming Tables: Designed for real-time data processing.
  • Feature Tables: Specialized for machine learning feature management.
  • Hive Tables: Legacy format, not recommended for new projects.

Each table type has its own creation syntax and usage patterns, and the choice depends on your specific use case, data storage requirements, and workflow complexity.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Data Flow: Alter Row Transformation

Alter Row transformation in ADF modifies data rows in a data flow. It handles insert, update, delete, and upsert operations. You define conditions for each operation. Use it to apply changes to a destination dataset. It works with databases supporting CRUD operations. Configure it in the mapping data flow. Map input columns to target columns. Set policies for row changes. It ensures data consistency. Use expressions for conditional logic. It’s useful for incremental data loads. Supports SQL-based sinks. Optimize performance with proper partitioning.

What is the Alter Row Transformation?

The Alter Row Transformation is used to set row-level policies for data being written to a sink. This transformation is particularly useful when you are working with slowly changing dimensions (SCD) or when you need to synchronize data between source and sink systems.

Key Features

  1. Define Row Actions:
    • Insert: Add new rows.
    • Update: Modify existing rows.
    • Delete: Remove rows.
    • Upsert: Insert or update rows.
    • No Action: Ignore rows.
  2. Condition-Based Rules:
    • Define rules using expressions for each action.
  3. Works with Supported Sinks:
    • SQL Database, Delta Lake, and more.

How Does the Alter Row Transformation Work?

  1. Input Data: The transformation takes input data from a previous transformation in the data flow.
  2. Define Conditions: You define conditions for each action (insert, update, delete, upsert) using expressions.
  3. Output to Sink: The transformation passes the data to the sink, where the specified actions are performed based on the conditions.

Preparing test data

We will focus on aggregate transformation core concepts.

id CustID Product Quantity Amount
1  C1      A	  2	 20
2  C1      B	  3	 30
3  C2      C	  1	 10
4  C1      A	  2	 20
5  C3      A	  3	 30
6  C2      B	  1	 10
7  C3      C	  2	 20
8  C1      C	  3	 30
9  C1      A	  2	 20
10 C2      A	  1	 30
11 C3      C	  3	 10

Use Alter Row Transformation

Step 1: Create Data Flow

Create a Data Flow, add a source transformation and configure it.

preview source data

Step 2: add Alter Transformation

Alter row condition has 4 options:

  • Insert if
  • Update if
  • Delete if
  • Upsert if

Using Dataflow expression builder to build condition

preview its output.

We must originate the action order. Actions are processed in the order defined

Step 3: Add Sink transformation

Add a Sink Transformation, configure it.

Currently, Sink Transformation support some of datasets, Inline datasets and dataset object. such as Database, Blob, ADLS, Delta Lake (Online dataset), detail list at Microsoft Documentation

Inline datasets are recommended when you use flexible schemas, one-off sink instances, or parameterized sinks. If your sink is heavily parameterized, inline datasets allow you to not create a “dummy” object. Inline datasets are based in Spark, and their properties are native to data flow.

Dataset objects are reusable entities that can be used in other data flows and activities such as Copy. 

For this demo, we are using Delta, Inline dataset.

When alter row policy allow Delete, Update, Upsert, we have to set Primary Key.

Use Data Flow in Pipeline

we completed the data flow, it is ready for use it in pipeline.

Create a pipeline

Create a pipeline and configure the data flow.

let’s change the source data

Execute the pipeline again, the delta table result

Conclusion

Notes

  • Actions are processed in the order defined.
  • Test rules with Data Preview.
  • Primary Key: The sink must have keys for updates and deletes. Ensure that your sink has a primary key defined, as it is required for update, delete, and upsert operations.

By using the Alter Row Transformation in ADF, you can efficiently manage data changes and ensure that your sink systems are always up-to-date with the latest data from your sources. This transformation is a powerful tool for data engineers working on ETL/ELT pipelines in Azure.

Please do not hesitate to contact me if you have any questions at William . Chen @ mainri.ca

(remove all space from the email account )

Data Flow: Aggregate Transformation

The Aggregate transformation in Azure Data Factory (ADF) Data Flows is a powerful tool for performing calculations on groups of data. It’s analogous to the GROUP BY clause in SQL, allowing you to summarize data based on one or more grouping columns.

Purpose

The Aggregate transformation allows you to:

  • Group data: Group rows based on the values in one or more specified columns.
  • Perform aggregations: Calculate aggregate values (like sum, average, count, min, max, etc.) for each group.

Key Features and Settings:

  • Group By: This section defines the columns by which the data will be grouped. You can select one or more columns. Rows with the same values in these columns will be grouped together.
  • Aggregates: This section defines the aggregations to be performed on each group. You specify:
    • New column name: The name of the resulting aggregated column.
    • Expression: The aggregation function and the column to which it’s applied.

Available Aggregate Functions

ADF Data Flows support a wide range of aggregate functions, including:

  • avg(column): Calculates the average of a column.
  • count(column) or count(*): Counts the number of rows in a group. count(*) counts all rows, even if some columns are null. count(column) counts only non-null values in the specified column.
  • max(column): Finds the maximum value in a column.
  • min(column): Finds the minimum value in a column.
  • sum(column): Calculates the sum of a column.
  • collect(column): Collects all values within a group into an array.
  • first(column): Returns the first value encountered in the group.
  • last(column): Returns the last value encountered in the group.
  • stddev(column): Calculates the standard deviation of a column.
  • variance(column): Calculates the variance of a column.

Preparing test data

With assumed ADF/Synapse expertise, we will focus on aggregate transformation core concepts.

sample dataset
CustID Product Quantity Amount
C1,     A,      2,      20
C1,     B,      3,      30
C2,     C,      1,      10
C1,     A,      2,      20
C3,     A,      3,      30
C2,     B,      1,      10
C3,     C,      2,      20
C1,     C,      3,      30
C1,     A,      2,      20
C2,     A,      1,      30
C3,     C,      3,      10

Create Data Flow

Configure Source

Add Aggregate Transformation

he functionality of aggregate transformations is equivalent to that of the GROUP BY clause in T-SQL.

in SQL script, we write this query:

select product
, count(quantity) as sold_times
, sum(quantity) as sold_items
, sum(amount) as sold_amount 
, avg(amount) as Avg_price
from sales group by product;

get this result
product	sold_times  sold_items  sold_amount   Avg_price
A	   10		6	 120	      24.0
B	   4		12	 40	      20.0
C	   9		3	 70	      17.5

Using Aggregate transformation in this way.

we can use “expression builder” to write the expression

It performs the same grouping and aggregation operations as TSQL’s GROUP BY.

Important Considerations

  • Null Handling: Pay attention to how aggregate functions handle null values. For example, sum() ignores nulls, while count(column) only counts non-null values.
  • Data Types: Ensure that the data types of the columns you’re aggregating are compatible with the chosen aggregate functions.
  • Performance: For large datasets, consider partitioning your data before the Aggregate transformation to improve performance.
  • Distinct Count: For calculating distinct counts, use the countDistinct(column) function.

Conclusion

By using the Aggregate transformation effectively, you can efficiently summarize and analyze your data within ADF Data Flows. Remember to carefully consider the appropriate aggregate functions and grouping columns to achieve your desired results.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account )