Data Flow: Alter Row Transformation

Alter Row transformation in ADF modifies data rows in a data flow. It handles insert, update, delete, and upsert operations. You define conditions for each operation. Use it to apply changes to a destination dataset. It works with databases supporting CRUD operations. Configure it in the mapping data flow. Map input columns to target columns. Set policies for row changes. It ensures data consistency. Use expressions for conditional logic. It’s useful for incremental data loads. Supports SQL-based sinks. Optimize performance with proper partitioning.

What is the Alter Row Transformation?

The Alter Row Transformation is used to set row-level policies for data being written to a sink. This transformation is particularly useful when you are working with slowly changing dimensions (SCD) or when you need to synchronize data between source and sink systems.

Key Features

  1. Define Row Actions:
    • Insert: Add new rows.
    • Update: Modify existing rows.
    • Delete: Remove rows.
    • Upsert: Insert or update rows.
    • No Action: Ignore rows.
  2. Condition-Based Rules:
    • Define rules using expressions for each action.
  3. Works with Supported Sinks:
    • SQL Database, Delta Lake, and more.

How Does the Alter Row Transformation Work?

  1. Input Data: The transformation takes input data from a previous transformation in the data flow.
  2. Define Conditions: You define conditions for each action (insert, update, delete, upsert) using expressions.
  3. Output to Sink: The transformation passes the data to the sink, where the specified actions are performed based on the conditions.

Preparing test data

We will focus on aggregate transformation core concepts.

id CustID Product Quantity Amount
1  C1      A	  2	 20
2  C1      B	  3	 30
3  C2      C	  1	 10
4  C1      A	  2	 20
5  C3      A	  3	 30
6  C2      B	  1	 10
7  C3      C	  2	 20
8  C1      C	  3	 30
9  C1      A	  2	 20
10 C2      A	  1	 30
11 C3      C	  3	 10

Use Alter Row Transformation

Step 1: Create Data Flow

Create a Data Flow, add a source transformation and configure it.

preview source data

Step 2: add Alter Transformation

Alter row condition has 4 options:

  • Insert if
  • Update if
  • Delete if
  • Upsert if

Using Dataflow expression builder to build condition

preview its output.

We must originate the action order. Actions are processed in the order defined

Step 3: Add Sink transformation

Add a Sink Transformation, configure it.

Currently, Sink Transformation support some of datasets, Inline datasets and dataset object. such as Database, Blob, ADLS, Delta Lake (Online dataset), detail list at Microsoft Documentation

Inline datasets are recommended when you use flexible schemas, one-off sink instances, or parameterized sinks. If your sink is heavily parameterized, inline datasets allow you to not create a “dummy” object. Inline datasets are based in Spark, and their properties are native to data flow.

Dataset objects are reusable entities that can be used in other data flows and activities such as Copy. 

For this demo, we are using Delta, Inline dataset.

When alter row policy allow Delete, Update, Upsert, we have to set Primary Key.

Use Data Flow in Pipeline

we completed the data flow, it is ready for use it in pipeline.

Create a pipeline

Create a pipeline and configure the data flow.

let’s change the source data

Execute the pipeline again, the delta table result

Conclusion

Notes

  • Actions are processed in the order defined.
  • Test rules with Data Preview.
  • Primary Key: The sink must have keys for updates and deletes. Ensure that your sink has a primary key defined, as it is required for update, delete, and upsert operations.

By using the Alter Row Transformation in ADF, you can efficiently manage data changes and ensure that your sink systems are always up-to-date with the latest data from your sources. This transformation is a powerful tool for data engineers working on ETL/ELT pipelines in Azure.

Please do not hesitate to contact me if you have any questions at William . Chen @ mainri.ca

(remove all space from the email account )

Data Flow: Aggregate Transformation

The Aggregate transformation in Azure Data Factory (ADF) Data Flows is a powerful tool for performing calculations on groups of data. It’s analogous to the GROUP BY clause in SQL, allowing you to summarize data based on one or more grouping columns.

Purpose

The Aggregate transformation allows you to:

  • Group data: Group rows based on the values in one or more specified columns.
  • Perform aggregations: Calculate aggregate values (like sum, average, count, min, max, etc.) for each group.

Key Features and Settings:

  • Group By: This section defines the columns by which the data will be grouped. You can select one or more columns. Rows with the same values in these columns will be grouped together.
  • Aggregates: This section defines the aggregations to be performed on each group. You specify:
    • New column name: The name of the resulting aggregated column.
    • Expression: The aggregation function and the column to which it’s applied.

Available Aggregate Functions

ADF Data Flows support a wide range of aggregate functions, including:

  • avg(column): Calculates the average of a column.
  • count(column) or count(*): Counts the number of rows in a group. count(*) counts all rows, even if some columns are null. count(column) counts only non-null values in the specified column.
  • max(column): Finds the maximum value in a column.
  • min(column): Finds the minimum value in a column.
  • sum(column): Calculates the sum of a column.
  • collect(column): Collects all values within a group into an array.
  • first(column): Returns the first value encountered in the group.
  • last(column): Returns the last value encountered in the group.
  • stddev(column): Calculates the standard deviation of a column.
  • variance(column): Calculates the variance of a column.

Preparing test data

With assumed ADF/Synapse expertise, we will focus on aggregate transformation core concepts.

sample dataset
CustID Product Quantity Amount
C1,     A,      2,      20
C1,     B,      3,      30
C2,     C,      1,      10
C1,     A,      2,      20
C3,     A,      3,      30
C2,     B,      1,      10
C3,     C,      2,      20
C1,     C,      3,      30
C1,     A,      2,      20
C2,     A,      1,      30
C3,     C,      3,      10

Create Data Flow

Configure Source

Add Aggregate Transformation

he functionality of aggregate transformations is equivalent to that of the GROUP BY clause in T-SQL.

in SQL script, we write this query:

select product
, count(quantity) as sold_times
, sum(quantity) as sold_items
, sum(amount) as sold_amount 
, avg(amount) as Avg_price
from sales group by product;

get this result
product	sold_times  sold_items  sold_amount   Avg_price
A	   10		6	 120	      24.0
B	   4		12	 40	      20.0
C	   9		3	 70	      17.5

Using Aggregate transformation in this way.

we can use “expression builder” to write the expression

It performs the same grouping and aggregation operations as TSQL’s GROUP BY.

Important Considerations

  • Null Handling: Pay attention to how aggregate functions handle null values. For example, sum() ignores nulls, while count(column) only counts non-null values.
  • Data Types: Ensure that the data types of the columns you’re aggregating are compatible with the chosen aggregate functions.
  • Performance: For large datasets, consider partitioning your data before the Aggregate transformation to improve performance.
  • Distinct Count: For calculating distinct counts, use the countDistinct(column) function.

Conclusion

By using the Aggregate transformation effectively, you can efficiently summarize and analyze your data within ADF Data Flows. Remember to carefully consider the appropriate aggregate functions and grouping columns to achieve your desired results.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account )

Change Data Capture with Azure Data Factory and Synapse Analytics

When we perform data integration and ETL processes, the most effective way is only read the source data that has changed since the last time the pipeline ran, rather than always querying an entire dataset on each run.

We will explore the different Change Data Capture (CDC) capabilities (CDC in Mapping Data flowTop level CDC in ADFSynapse link) available in Azure Data Factory and Azure Synapse Analytics.

Support data source and target

currently, ADF support the following data source and target

Supported data sources

  • Avro
  • Azure Cosmos DB (SQL API)
  • Azure SQL Database
  • Azure SQL Managed Instance
  • Delimited Text
  • JSON
  • ORC
  • Parquet
  • SQL Server
  • XML
  • Snowflake

Supported targets

  • Avro
  • Azure SQL Database
  • SQL Managed Instance
  • Delimited Text
  • Delta
  • JSON
  • ORC
  • Parquet
  • Azure Synapse Analytics

Azure Synapse Analytics as Target

When using Azure Synapse Analytics as target, the Staging Settings is available on the main table canvas. Enabling staging is mandatory when selecting Azure Synapse Analytics as the target. 

Staging Settings can be configured in two ways: utilizing Factory settings or opting for a Custom settingsFactory settings apply at the factory level. For the first time, if these settings aren’t configured, you’ll be directed to the global staging setting section for configuration. Once set, all CDC top-level resources will adopt this configuration. Custom settings is scoped only for the CDC resource for which it is configured and overrides the Factory settings.

Known limitations

  • Currently, when creating source/target mappings, each source and target is only allowed to be used once.
  • Complex types are currently unsupported.
  • Self-hosted integration runtime (SHIR) is currently unsupported.

CDC ADLS to SQL Database

Create a CDC artifact

Go to the Author pane in data factory. Below Pipelines, a new top-level artifact called Change Data Capture (preview) appears.

Configuring Source properties

Use the dropdown list to choose your data source. For this demo, select DelimitedText.

To support Change Data Capture (CDC), it’s recommended to create a dedicated Linked Service, as current implementations use a single Linked Service for both source and target.

You can choose to add multiple source folders by using the plus (+) button. The other sources must also use the same linked service that you already selected.

Configuring target

This demo uses a SQL database and a dedicated Linked Service for CDC.

configuring the target table

If existing tables at the target have matching names, they’re selected by default under Existing entities. If not, new tables with matching names are created under New entities. Additionally, you can edit new tables by using the Edit new tables button.

capturing change data studio appears

let’s click the “columns mapping”

If you want to enable the column mappings, select the mappings and turn off the Auto map toggle. Then, select the Column mappings button to view the mappings. You can switch back to automatic mapping anytime by turning on the Auto map toggle.

Configure CDC latency

After your mappings are complete, set your CDC latency by using the Set Latency button.

Publish and starting CDC

After you finish configuring your CDC, select Publish all to publish your changes, then Start to start running your change data capture.

Monitoring CDC

For monitoring CDC, we can either from ADF’s studio’s monitor or from CDC studio

Once data changed, CDC will automatically detecting and tracking data changing, deliver to target

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Day 8 – Data Lineage, Extract SQL, ADF and Synapse Pipeline Lineage

Microsoft Purview provides an overview of data lineage in the Data Catalog. It also details how data systems can integrate with the catalog to capture lineage of data.

Lineage is represented visually to show data moving from source to destination including how the data was transformed. Given the complexity of most enterprise data environments.

Microsoft Purview supports lineage for views and stored procedures from Azure SQL Database. While lineage for views is supported as part of scanning, you will need to turn on the Lineage extraction toggle to extract stored procedure lineage when you’re setting up a scan.

Lineage collection

Metadata collected in Microsoft Purview from enterprise data systems are stitched across to show an end to end data lineage. Data systems that collect lineage into Microsoft Purview are broadly categorized into following three types:

  • Data processing systems
  • Data storage systems
  • Data analytics and reporting systems

Each system supports a different level of lineage scope.  

Data estate might include systems doing data extraction, transformation (ETL/ELT systems), analytics, and visualization systems. Each of the systems captures rich static and operational metadata that describes the state and quality of the data within the systems boundary. The goal of lineage in a data catalog is to extract the movement, transformation, and operational metadata from each data system at the lowest grain possible.

The following example is a typical use case of data moving across multiple systems, where the Data Catalog would connect to each of the systems for lineage.

  • Data Factory copies data from on-prem/raw zone to a landing zone in the cloud.
  • Data processing systems like Synapse, Databricks would process and transform data from landing zone to Curated zone using notebooks.
  • Further processing of data into analytical models for optimal query performance and aggregation.
  • Data visualization systems will consume the datasets and process through their meta model to create a BI Dashboard, ML experiments and so on.

Lineage for SQL DB views

Starting 6/30/24, SQL DB metadata scan will include lineage extraction for views. Only new scans will include the view lineage extraction. Lineage is extracted at all scan levels (L1/L2/L3). In case of an incremental scan, whatever metadata is scanned as part of incremental scan, the corresponding static lineage for tables/views will be extracted.

Prerequisites for setting up a scan with Stored Procedure lineage extraction

<Purview-Account> can access SQL Database and in db_owner group

To check whether the Account Exists in the Database


SELECT name, type_desc
FROM sys.database_principals
WHERE name = 'YourUserName';

Replace ‘YourUserName’ with the actual username you’re checking for.

If the user exists, it will return the name and type (e.g., SQL_USER or WINDOWS_USER).

If it does not exist, create one.

Sign in to Azure SQL Database with your Microsoft Entra account, create a <Purview-account> account and assign db_owner permissions to the Microsoft Purview managed identity.

You can review my previous article Configuring Azure Entra ID Authentication in Azure SQL Database If you are not sure how to enable Azure Entra ID login.


Create user <purview-account> FROM EXTERNAL PROVIDER
GO
EXEC sp_addrolemember 'db_owner', <purview-account> 
GO

replace <purview-account> with the actual purview account name.

Master Key

Check whether master exists or not.

To check if the Database Master Key (DMK) exists or not


SELECT * FROM sys.symmetric_keys
WHERE name = '##MS_DatabaseMasterKey##';Create master key
Go

if the query returns a result, it means the Database Master Key already exists.

If no rows are returned, it means the Database Master Key does not exist, and you may need to create one if required for encryption-related operations.

Create a master key


Create master key
Go

Allow Azure services and resources to access this server 

Ensure that Allow Azure services and resources to access this server is enabled under networking/firewall for your Azure SQL resource.

Previously, we have discussed create a scan for Azure SQL Database at Registering Azure SQL Database and Scan in Purview, that scan progress is disabled “Lineage extraction” in that article.

To allow purview extract lineage, we need set to on

Extract Azure Data Factory/Synapse pipeline lineage

When we connect an Azure Data Factory to Microsoft Purview, whenever a supported Azure Data Factory activity is run, metadata about the activity’s source data, output data, and the activity will be automatically ingested into the Microsoft Purview Data Map.

Microsoft Purview captures runtime lineage from the following Azure Data Factory activities:

  • Copy Data
  • Data Flow
  • Execute SSIS Package

If a data source has already been scanned and exists in the data map, the ingestion process will add the lineage information from Azure Data Factory to that existing source. If the source or output doesn’t exist in the data map and is supported by Azure Data Factory lineage Microsoft Purview will automatically add their metadata from Azure Data Factory into the data map under the root collection.

This can be an excellent way to monitor your data estate as users move and transform information using Azure Data Factory.

Connect to Microsoft Purview account in Data Factory

Set up authentication

Data factory’s managed identity is used to authenticate lineage push operations from data factory to Microsoft Purview. Grant the data factory’s managed identity Data Curator role on Microsoft Purview root collection.

Purview > Management > Lineage connections > Data Factory > new

Validation: Purview > Data map > Collection > Root collection > Role assignments >

Check, the ADF is under “data Curators” section. That’s OK

ADF connect to purview

In the ADF studio: Manage -> Microsoft Purview, and select Connect to a Microsoft Purview account

We will see this

Once pipeline successfully runs, activity will be caught, extracted lineage look this.

that’s all for extracting ADF pipeline lineage.

Next step: Day 9 – Managed attributes in Data Map

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

ADF activities failure vs pipeline failure and pipeline error handling logical mechanism

Understanding how failures in individual activities affect the pipeline as a whole is crucial for building robust data workflows.

Some people have used SSIS previously, when they switch from SSIS to the Azure Data Factory and Synapse, they might confuse in ADF or ASA ‘s “pipeline logical failure mechanisam” ADF or ASA’s pipeline orchestration allows conditional logic and enables the user to take a different path based upon outcomes of a previous activity. Using different paths allows users to build robust pipelines and incorporates error handling in ETL/ELT logic.

ADF or ASA activity outcomes path

ADF or ASA has 4 paths in total.

A pipeline can have multiple activities that can be executed in sequence or in parallel.

  • Sequential Execution: Activities are executed one after another.
  • Parallel Execution: Multiple activities run simultaneously.

You are able to add multiple branches following an activity, for each pipeline run, at most one path is activated, based on the execution outcome of the activity.

Error Handling Mechanism

When an activity fails within a pipeline, several mechanisms can be employed to handle the failure:

In most cases, pipelines are orchestrated in Parallel, Serial or Mixed model. The key point is understanding what will happen in Parallet or Serial model.

From upon activity point of view, the basic principles that are:

Multiple dependencies with the same source are logical “OR

Multiple dependencies with different sources are logical “AND

Different error handling mechanisms lead to different status for the pipeline: while some pipelines fail, others succeed. We determine pipeline success and failures as follows:

  • Evaluate outcome for all leaves activities. If a leaf activity was skipped, we evaluate its parent activity instead.
  • Pipeline result is success if and only if all nodes evaluated succeed

Let us discuss in detail.

Multiple dependencies with the same source

This seems like “Serial” or “sequence”

How “Serial” pipeline failure is determined

As we develop more complicated and resilient pipelines, it’s sometimes required to introduce conditional executions to our logic: execute a certain activity only if certain conditions are met. At this point, as long as one or more activities failed while one or other activities success in a pipeline, what is the status of the entire pipeline? Success? Failure? How are pipeline failure determined?

In fact, ADF/ASA has unique insight.  Software engineers are used to customary form:  

“if … then … else …”; try … catch …”, let’s use the developer’ idiom

Single upon activity or Serial model, multiple downstreamUpon activityDownstream successful path act1Downstream failure path act2Pipeline Status showscomment
try .. catch …Downstream success path onlySuccessSuccessSuccess
Downstream success path onlyFailedSuccessFailed
Downstream failure path onlyFailedFailedFailed
Downstream failure path onlyFailedSuccessSuccess not really success
If …then ..else …Both success & failure pathSuccessSuccessSuccess
Both success & failure pathFailedSuccessFailed
Both success & failure pathFailedFailedFailed
If .. Skip.. Else  …Both success & failure and skipSuccessSuccessSkipSuccess
Scenario 1: Try … catch …

Downstream success path only:
upon act success >> downstream act success >> pipeline Success

Downstream success path only:
upon act failed >> downstream act success >> pipeline Failed

Downstream failure path only

upon act failed >> downstream act success >> pipeline success

Scenario 2:

If … then … Else

Pipeline defines both the Upon Failure and Upon Success paths. This approach renders pipeline fails, even if Upon Failure path succeeds.

Both success & failure path

upon act failed >> downstream act failed >> pipeline success

Both success & failure path

upon act failed >> downstream failed >> pipeline failed

Scenario 3

If  …Skip… Else   ….

Both success & failure path, and skip path

upon act success >> downstream act success >> skip path is skipped >> pipeline success

Multiple dependencies with different sources

This seems like “Parallel”, its logical is “And”

Scenario 4:

Upon act 1 success and upon act 2 success >> downstream act success >> pipeline success.

Upon act 1 success and upon act 2 failed >> downstream act success >> pipeline success.

pay attention to the “Set variable failed” uses “fail” path.

That mean:

“set variable success” the action is true

Although “set variable failed” activity failed, but “set variable failed” the action is true.

so both “set variable success” and “set variable failed” the two action true.

pipeline shows to “success”

Now, let’s try this:

the “Set variable failed” uses “success” path, to see what pipeline shows, pipeline failed.

Why? since the “Set variable failed” action is not true. even if the “set variable success” action is True. True + False = False. follow activity – “set variable act” is skipped. will not execute, will not run! pipeline failed!

All right, you might immediately realize that once we let the “Set variable failed” path uses “complete”, that means no matter it true or false, the downstream activity “set variable act” will not be skipped. Pipeline will show success.

Error Handling

Sample error handling patterns

The pattern is equivalent to try catch block in coding. An activity might fail in a pipeline. When it fails, customer needs to run an error handling job to deal with it. However, the single activity failure shouldn’t block next activities in the pipeline. For instance, I attempt to run a copy job, moving files into storage. However it might fail half way through. And in that case, I want to delete the partially copied, unreliable files from the storage account (my error handling step). But I’m OK to proceed with other activities afterwards.

To set up the pattern:

  • Add first activity
  • Add error handling to the UponFailure path
  • Add second activity, but don’t connect to the first activity
  • Connect both UponFailure and UponSkip paths from the error handling activity to the second activity

Error Handling job runs only when First Activity fails. Next Activity will run regardless if First Activity succeeds or not.

Generic error handling

We have multiple activities running sequentially in the pipeline. If any fails, I need to run an error handling job to clear the state, and/or log the error.

For instance, I have sequential copy activities in the pipeline. If any of these fails, I need to run a script job to log the pipeline failure.

To set up the pattern:

  • Build sequential data processing pipeline
  • Add generic error handling step to the end of the pipeline
  • Connect both Upon Failure and Upon Skip paths from the last activity to the error handling activity

The last step, Generic Error Handling, will only run if any of the previous activities fails. It will not run if they all succeed.

You can add multiple activities for error handling.

Summary

Handling activity failures effectively is crucial for building robust pipelines in Azure Data Factory. By employing retry policies, conditional paths, and other error-handling strategies, you can ensure that your data workflows are resilient and capable of recovering from failures, minimizing the impact on your overall data processing operations.

if you have any questions, please do not hesitate to contact me at william. chen @mainri.ca (remove all space from the email account 😊)