Data Flow: Aggregate Transformation

The Aggregate transformation in Azure Data Factory (ADF) Data Flows is a powerful tool for performing calculations on groups of data. It’s analogous to the GROUP BY clause in SQL, allowing you to summarize data based on one or more grouping columns.

Purpose

The Aggregate transformation allows you to:

  • Group data: Group rows based on the values in one or more specified columns.
  • Perform aggregations: Calculate aggregate values (like sum, average, count, min, max, etc.) for each group.

Key Features and Settings:

  • Group By: This section defines the columns by which the data will be grouped. You can select one or more columns. Rows with the same values in these columns will be grouped together.
  • Aggregates: This section defines the aggregations to be performed on each group. You specify:
    • New column name: The name of the resulting aggregated column.
    • Expression: The aggregation function and the column to which it’s applied.

Available Aggregate Functions

ADF Data Flows support a wide range of aggregate functions, including:

  • avg(column): Calculates the average of a column.
  • count(column) or count(*): Counts the number of rows in a group. count(*) counts all rows, even if some columns are null. count(column) counts only non-null values in the specified column.
  • max(column): Finds the maximum value in a column.
  • min(column): Finds the minimum value in a column.
  • sum(column): Calculates the sum of a column.
  • collect(column): Collects all values within a group into an array.
  • first(column): Returns the first value encountered in the group.
  • last(column): Returns the last value encountered in the group.
  • stddev(column): Calculates the standard deviation of a column.
  • variance(column): Calculates the variance of a column.

Preparing test data

With assumed ADF/Synapse expertise, we will focus on aggregate transformation core concepts.

sample dataset
CustID Product Quantity Amount
C1,     A,      2,      20
C1,     B,      3,      30
C2,     C,      1,      10
C1,     A,      2,      20
C3,     A,      3,      30
C2,     B,      1,      10
C3,     C,      2,      20
C1,     C,      3,      30
C1,     A,      2,      20
C2,     A,      1,      30
C3,     C,      3,      10

Create Data Flow

Configure Source

Add Aggregate Transformation

he functionality of aggregate transformations is equivalent to that of the GROUP BY clause in T-SQL.

in SQL script, we write this query:

select product
, count(quantity) as sold_times
, sum(quantity) as sold_items
, sum(amount) as sold_amount 
, avg(amount) as Avg_price
from sales group by product;

get this result
product	sold_times  sold_items  sold_amount   Avg_price
A	   10		6	 120	      24.0
B	   4		12	 40	      20.0
C	   9		3	 70	      17.5

Using Aggregate transformation in this way.

we can use “expression builder” to write the expression

It performs the same grouping and aggregation operations as TSQL’s GROUP BY.

Important Considerations

  • Null Handling: Pay attention to how aggregate functions handle null values. For example, sum() ignores nulls, while count(column) only counts non-null values.
  • Data Types: Ensure that the data types of the columns you’re aggregating are compatible with the chosen aggregate functions.
  • Performance: For large datasets, consider partitioning your data before the Aggregate transformation to improve performance.
  • Distinct Count: For calculating distinct counts, use the countDistinct(column) function.

Conclusion

By using the Aggregate transformation effectively, you can efficiently summarize and analyze your data within ADF Data Flows. Remember to carefully consider the appropriate aggregate functions and grouping columns to achieve your desired results.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account )

Using Exists Transformation for Data Comparison in Azure Data Factory/Synapse

In this article, I will discuss on the Exists Transformation of Data Flow. The exists transformation is a row filtering transformation that checks whether your data exists in another source or stream. The output stream includes all rows in the left stream that either exist or don’t exist in the right stream. The exists transformation is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

I use the Exists transformation in Azure Data Factory or Synapse data flows to compare source and target data.” (This is the most straightforward and generally preferred option.

Create a Data Flow

Create a Source

Create a DerivedColumn Transformation

expression uses : sha2(256, columns())

Create target and derivedColumn transformation

The same way of source creates target. To keep the data type are the same so that we can use hash value to compare, I add a “Cast transformation”;

then the same as source setting, add a derivedColumn transformation.

Exists Transformation to compare Source and target

add a Exists to comparing source and target.

The Exists function offers two options: Exists and Doesn’t Exist. It supports multiple criteria and custom expressions.

Configuration

  1. Choose which data stream you’re checking for existence in the Right stream dropdown.
  2. Specify whether you’re looking for the data to exist or not exist in the Exist type setting.
  3. Select whether or not your want a Custom expression.
  4. Choose which key columns you want to compare as your exists conditions. By default, data flow looks for equality between one column in each stream. To compare via a computed value, hover over the column dropdown and select Computed column.

“Exists” option

Now, let use “Exists” option

we got this depid = 1004 exists.

Doesn’t Exist

use “Doesn’t Exist” option

we got depid = 1003. wholessale exists in Source side, but does NOT exist in target.

Recap

The “Exists Transformation” is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

It is very convenient to compare in data engineering project, e.g. ETL comparison.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Change Data Capture with Azure Data Factory and Synapse Analytics

When we perform data integration and ETL processes, the most effective way is only read the source data that has changed since the last time the pipeline ran, rather than always querying an entire dataset on each run.

We will explore the different Change Data Capture (CDC) capabilities (CDC in Mapping Data flowTop level CDC in ADFSynapse link) available in Azure Data Factory and Azure Synapse Analytics.

Support data source and target

currently, ADF support the following data source and target

Supported data sources

  • Avro
  • Azure Cosmos DB (SQL API)
  • Azure SQL Database
  • Azure SQL Managed Instance
  • Delimited Text
  • JSON
  • ORC
  • Parquet
  • SQL Server
  • XML
  • Snowflake

Supported targets

  • Avro
  • Azure SQL Database
  • SQL Managed Instance
  • Delimited Text
  • Delta
  • JSON
  • ORC
  • Parquet
  • Azure Synapse Analytics

Azure Synapse Analytics as Target

When using Azure Synapse Analytics as target, the Staging Settings is available on the main table canvas. Enabling staging is mandatory when selecting Azure Synapse Analytics as the target. 

Staging Settings can be configured in two ways: utilizing Factory settings or opting for a Custom settingsFactory settings apply at the factory level. For the first time, if these settings aren’t configured, you’ll be directed to the global staging setting section for configuration. Once set, all CDC top-level resources will adopt this configuration. Custom settings is scoped only for the CDC resource for which it is configured and overrides the Factory settings.

Known limitations

  • Currently, when creating source/target mappings, each source and target is only allowed to be used once.
  • Complex types are currently unsupported.
  • Self-hosted integration runtime (SHIR) is currently unsupported.

CDC ADLS to SQL Database

Create a CDC artifact

Go to the Author pane in data factory. Below Pipelines, a new top-level artifact called Change Data Capture (preview) appears.

Configuring Source properties

Use the dropdown list to choose your data source. For this demo, select DelimitedText.

To support Change Data Capture (CDC), it’s recommended to create a dedicated Linked Service, as current implementations use a single Linked Service for both source and target.

You can choose to add multiple source folders by using the plus (+) button. The other sources must also use the same linked service that you already selected.

Configuring target

This demo uses a SQL database and a dedicated Linked Service for CDC.

configuring the target table

If existing tables at the target have matching names, they’re selected by default under Existing entities. If not, new tables with matching names are created under New entities. Additionally, you can edit new tables by using the Edit new tables button.

capturing change data studio appears

let’s click the “columns mapping”

If you want to enable the column mappings, select the mappings and turn off the Auto map toggle. Then, select the Column mappings button to view the mappings. You can switch back to automatic mapping anytime by turning on the Auto map toggle.

Configure CDC latency

After your mappings are complete, set your CDC latency by using the Set Latency button.

Publish and starting CDC

After you finish configuring your CDC, select Publish all to publish your changes, then Start to start running your change data capture.

Monitoring CDC

For monitoring CDC, we can either from ADF’s studio’s monitor or from CDC studio

Once data changed, CDC will automatically detecting and tracking data changing, deliver to target

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Azure Data Factory or Synapse Copy Activity with File System

In Azure Data Factory (ADF) or Synapse, using Copy Activity with a File System as a source or sink is common when dealing with on-premises file systems, network file shares, or even cloud-based file storage systems. Here’s an overview of how it works, key considerations, and steps to set it up.

Key Components and setup with File System:

Create a File System Linked Service

Linked Service: For on-premises or network file systems, you typically need a Self-hosted Integration Runtime (SHIR).

Fill in the required fields:

  • Connection: Specify the file system type (e.g., network share or local path).
  • Authentication: Provide the appropriate credentials, such as username/password, or key-based authentication.
  • If the file system is on-premises, configure the Self-hosted Integration Runtime to access it.

Create File System Dataset

Go to Datasets in ADF and create a new dataset. Select File System as the data source.

Configure the dataset to point to the correct file or folder:

  • Specify the File Path.
  • Define the file format (e.g., CSV, JSON, XML).
  • Set any schema information if required (for structured data like CSV).

Considerations:

  • Integration Runtime: For on-premises file systems, the Self-hosted Integration Runtime (SHIR) is essential to securely move data from private networks.
  • Performance: Data transfer speeds depend on network configurations (for on-prem) and ADF’s parallelism settings.
  • File Formats: Ensure proper handling of different file formats (e.g., CSV, JSON, Binary etc.) and schema mapping for structured files.
  • Security: Ensure credentials and network configurations are correctly set up, and consider encryption if dealing with sensitive data.

Common Errors:

  • Connection issues: If the SHIR is not correctly configured, or if there are issues with firewall or network settings, ADF may not be able to access the file system.
  • Permission issues: Ensure that the correct permissions are provided to access the file system (file share, SMB, FTP, etc.).

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Create External Data Sources in Synapse Serverless SQL

An external data source in Synapse serverless SQL is typically used to reference data stored outside of the SQL pool, such as in Azure Data Lake Storage (ADLS) or Blob Storage. This allows you to query data directly from these external sources using T-SQL.

There are different ways to create external data source. Using Synapse Studio UI, coding etc. the easiest way is to leverage Synapse Studio UI. But we had better know how to use code to create it since in some cases we have to use this way.

Here’s how to create an external data source in Synapse serverless SQL

Using Synapse Studio UI to create External Data Source

Create Lake Database

Open Synapse Studio

On the left side, select Data portal > workspace

Fill in the properties:

Create external table from data lake

Double clicks the Lake Database you just created.

in the Lake Database tag, click “+ Table”

fill in the detail information:

Continue to configure the table properyies

Adjust Table properties

Adjust column other properties, or add even more columns, such as data type, description, Nullability, Primary Key, set up partition create relationship …… etc.

Repeat the above steps to create even more tables to meet your business logic need, or create relationship if need.

Script to create an External Data Source

Step 1:

1. Connect to Serverless SQL Pool:

Open Synapse Studio, go to the “Data” hub, and connect to your serverless SQL pool.

2. Create the External Data Source:

Use the following T-SQL script to create an external data source that points to your Azure Data Lake Storage (ADLS) or Blob Storage:

CREATE EXTERNAL DATA SOURCE MyExternalDataSource
WITH (
LOCATION = ‘https://<your-storage-account-name>.dfs.core.windows.net/<your-filesystem-name>‘,
CREDENTIAL = <your-credential-name>
);

Replace <your-storage-account-name>, <your-filesystem-name>, and <your-credential-name> with the appropriate values:

  • LOCATION: The URL of your Azure Data Lake Storage (ADLS) or Blob Storage.
  • CREDENTIAL: The name of the database credential used to access the storage. (You may need to create this credential if it doesn’t already exist.)

Step 2:

If you don’t have a credential yet, create one as follows:

1. Create a Credential:

CREATE DATABASE SCOPED CREDENTIAL MyStorageCredential
WITH IDENTITY = ‘SHARED ACCESS SIGNATURE’,
SECRET = ”;

Replace <your-SAS-token> with your Azure Storage Shared Access Signature (SAS) token.

2. Create an External Table or Query the External Data

After setting up the external data source, you can create external tables or directly query data:

Create an External Table:

You can create an external table that maps to the data in your external storage:

CREATE EXTERNAL TABLE MyExternalTable (
Column1 INT,
Column2 NVARCHAR(50),
Column3 DATETIME
)
WITH (
LOCATION = ‘/path/to/data.csv’,
DATA_SOURCE = MyExternalDataSource,
FILE_FORMAT = MyFileFormat — You need to define a file format
);

Query the External Data

You can also directly query the data without creating an external table:

SELECT *
FROM OPENROWSET(
BULK ‘/path/to/data.csv’,
DATA_SOURCE = ‘MyExternalDataSource’,
FORMAT = ‘CSV’,
FIELDTERMINATOR = ‘,’,
ROWTERMINATOR = ‘\n’
) AS MyData;

Create and Use a File Format (Optional)

If you are querying structured files (like CSV, Parquet), you might need to define a file format:

CREATE EXTERNAL FILE FORMAT MyFileFormat
WITH (
FORMAT_TYPE = DELIMITEDTEXT,
FORMAT_OPTIONS (FIELD_TERMINATOR = ‘,’, STRING_DELIMITER = ‘”‘)
);

Summary

By following these steps, you should be able to connect to and query your external data sources using the serverless SQL pool in Synapse. Let me know if you need further assistance!

  • Create an external data source in Synapse serverless SQL to point to your external storage.
  • Create a database scoped credential if necessary to access your storage.
  • Create an external table or directly query data using OPENROWSET.
  • Define a file format if working with structured data like CSV or Parquet.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Day 9: Managed attributes in Data Map

With “Managed Attributes” we can add own attributes (groups of attributes) and provide data stewards with the functionality to improve the content of data catalog.

  • create a new attribute group
  • create a new attribute
  • learn more about the available field types
  • assign and manage attributes for your data assets

Create a new attribute group

Create attribute group if there is no attribute group

Purview studio > Data Map > Managed attributes > New attribute group

Fill in

Create a new attribute

File in those fields
For field group: There are those can be selected

For applicable asset types, many options out of box to be used

Now, new attributes created

In the managed attribute management experience, managed attributes can’t be deleted, only expired. Expired attributes can’t be applied to any assets and are, by default, hidden in the user experience. Once an attribute created, it cannot change. Only mark them as “expired” and create a new, undated one.

Add value for managed attribute

Once a managed attribute has been created, you’ll need to add a value for each of your assets. You can add values to your assets by:

  1. Search for your data asset in the Microsoft Purview Data Catalog
  2. On the overview for your asset, you should see the managed attributes section with all attributes that have values. (You can see attributes without values by using the Show attributes without a value toggle.)
  3. Select the Edit button.

Under Managed attributes, add values for each of your attributes.

If any attributes are Required you will not be able to save until you’ve added a value for that attribute.

Now, managed attribute added

Summary

Managed attribute: A set of user-defined attributes that provide a business or organization level context to an asset. A managed attribute has a name and a value. For example, ‘Department’ is an attribute name and ‘Finance’ is its value. Attribute group: A grouping of managed attributes that allow for easier organization and consumption.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Next step: Day 10 – Collections access control and management

Day 8 – Data Lineage, Extract SQL, ADF and Synapse Pipeline Lineage

Microsoft Purview provides an overview of data lineage in the Data Catalog. It also details how data systems can integrate with the catalog to capture lineage of data.

Lineage is represented visually to show data moving from source to destination including how the data was transformed. Given the complexity of most enterprise data environments.

Microsoft Purview supports lineage for views and stored procedures from Azure SQL Database. While lineage for views is supported as part of scanning, you will need to turn on the Lineage extraction toggle to extract stored procedure lineage when you’re setting up a scan.

Lineage collection

Metadata collected in Microsoft Purview from enterprise data systems are stitched across to show an end to end data lineage. Data systems that collect lineage into Microsoft Purview are broadly categorized into following three types:

  • Data processing systems
  • Data storage systems
  • Data analytics and reporting systems

Each system supports a different level of lineage scope.  

Data estate might include systems doing data extraction, transformation (ETL/ELT systems), analytics, and visualization systems. Each of the systems captures rich static and operational metadata that describes the state and quality of the data within the systems boundary. The goal of lineage in a data catalog is to extract the movement, transformation, and operational metadata from each data system at the lowest grain possible.

The following example is a typical use case of data moving across multiple systems, where the Data Catalog would connect to each of the systems for lineage.

  • Data Factory copies data from on-prem/raw zone to a landing zone in the cloud.
  • Data processing systems like Synapse, Databricks would process and transform data from landing zone to Curated zone using notebooks.
  • Further processing of data into analytical models for optimal query performance and aggregation.
  • Data visualization systems will consume the datasets and process through their meta model to create a BI Dashboard, ML experiments and so on.

Lineage for SQL DB views

Starting 6/30/24, SQL DB metadata scan will include lineage extraction for views. Only new scans will include the view lineage extraction. Lineage is extracted at all scan levels (L1/L2/L3). In case of an incremental scan, whatever metadata is scanned as part of incremental scan, the corresponding static lineage for tables/views will be extracted.

Prerequisites for setting up a scan with Stored Procedure lineage extraction

<Purview-Account> can access SQL Database and in db_owner group

To check whether the Account Exists in the Database


SELECT name, type_desc
FROM sys.database_principals
WHERE name = 'YourUserName';

Replace ‘YourUserName’ with the actual username you’re checking for.

If the user exists, it will return the name and type (e.g., SQL_USER or WINDOWS_USER).

If it does not exist, create one.

Sign in to Azure SQL Database with your Microsoft Entra account, create a <Purview-account> account and assign db_owner permissions to the Microsoft Purview managed identity.

You can review my previous article Configuring Azure Entra ID Authentication in Azure SQL Database If you are not sure how to enable Azure Entra ID login.


Create user <purview-account> FROM EXTERNAL PROVIDER
GO
EXEC sp_addrolemember 'db_owner', <purview-account> 
GO

replace <purview-account> with the actual purview account name.

Master Key

Check whether master exists or not.

To check if the Database Master Key (DMK) exists or not


SELECT * FROM sys.symmetric_keys
WHERE name = '##MS_DatabaseMasterKey##';Create master key
Go

if the query returns a result, it means the Database Master Key already exists.

If no rows are returned, it means the Database Master Key does not exist, and you may need to create one if required for encryption-related operations.

Create a master key


Create master key
Go

Allow Azure services and resources to access this server 

Ensure that Allow Azure services and resources to access this server is enabled under networking/firewall for your Azure SQL resource.

Previously, we have discussed create a scan for Azure SQL Database at Registering Azure SQL Database and Scan in Purview, that scan progress is disabled “Lineage extraction” in that article.

To allow purview extract lineage, we need set to on

Extract Azure Data Factory/Synapse pipeline lineage

When we connect an Azure Data Factory to Microsoft Purview, whenever a supported Azure Data Factory activity is run, metadata about the activity’s source data, output data, and the activity will be automatically ingested into the Microsoft Purview Data Map.

Microsoft Purview captures runtime lineage from the following Azure Data Factory activities:

  • Copy Data
  • Data Flow
  • Execute SSIS Package

If a data source has already been scanned and exists in the data map, the ingestion process will add the lineage information from Azure Data Factory to that existing source. If the source or output doesn’t exist in the data map and is supported by Azure Data Factory lineage Microsoft Purview will automatically add their metadata from Azure Data Factory into the data map under the root collection.

This can be an excellent way to monitor your data estate as users move and transform information using Azure Data Factory.

Connect to Microsoft Purview account in Data Factory

Set up authentication

Data factory’s managed identity is used to authenticate lineage push operations from data factory to Microsoft Purview. Grant the data factory’s managed identity Data Curator role on Microsoft Purview root collection.

Purview > Management > Lineage connections > Data Factory > new

Validation: Purview > Data map > Collection > Root collection > Role assignments >

Check, the ADF is under “data Curators” section. That’s OK

ADF connect to purview

In the ADF studio: Manage -> Microsoft Purview, and select Connect to a Microsoft Purview account

We will see this

Once pipeline successfully runs, activity will be caught, extracted lineage look this.

that’s all for extracting ADF pipeline lineage.

Next step: Day 9 – Managed attributes in Data Map

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)