Data Flow: Alter Row Transformation

Alter Row transformation in ADF modifies data rows in a data flow. It handles insert, update, delete, and upsert operations. You define conditions for each operation. Use it to apply changes to a destination dataset. It works with databases supporting CRUD operations. Configure it in the mapping data flow. Map input columns to target columns. Set policies for row changes. It ensures data consistency. Use expressions for conditional logic. It’s useful for incremental data loads. Supports SQL-based sinks. Optimize performance with proper partitioning.

What is the Alter Row Transformation?

The Alter Row Transformation is used to set row-level policies for data being written to a sink. This transformation is particularly useful when you are working with slowly changing dimensions (SCD) or when you need to synchronize data between source and sink systems.

Key Features

  1. Define Row Actions:
    • Insert: Add new rows.
    • Update: Modify existing rows.
    • Delete: Remove rows.
    • Upsert: Insert or update rows.
    • No Action: Ignore rows.
  2. Condition-Based Rules:
    • Define rules using expressions for each action.
  3. Works with Supported Sinks:
    • SQL Database, Delta Lake, and more.

How Does the Alter Row Transformation Work?

  1. Input Data: The transformation takes input data from a previous transformation in the data flow.
  2. Define Conditions: You define conditions for each action (insert, update, delete, upsert) using expressions.
  3. Output to Sink: The transformation passes the data to the sink, where the specified actions are performed based on the conditions.

Preparing test data

We will focus on aggregate transformation core concepts.

id CustID Product Quantity Amount
1  C1      A	  2	 20
2  C1      B	  3	 30
3  C2      C	  1	 10
4  C1      A	  2	 20
5  C3      A	  3	 30
6  C2      B	  1	 10
7  C3      C	  2	 20
8  C1      C	  3	 30
9  C1      A	  2	 20
10 C2      A	  1	 30
11 C3      C	  3	 10

Use Alter Row Transformation

Step 1: Create Data Flow

Create a Data Flow, add a source transformation and configure it.

preview source data

Step 2: add Alter Transformation

Alter row condition has 4 options:

  • Insert if
  • Update if
  • Delete if
  • Upsert if

Using Dataflow expression builder to build condition

preview its output.

We must originate the action order. Actions are processed in the order defined

Step 3: Add Sink transformation

Add a Sink Transformation, configure it.

Currently, Sink Transformation support some of datasets, Inline datasets and dataset object. such as Database, Blob, ADLS, Delta Lake (Online dataset), detail list at Microsoft Documentation

Inline datasets are recommended when you use flexible schemas, one-off sink instances, or parameterized sinks. If your sink is heavily parameterized, inline datasets allow you to not create a “dummy” object. Inline datasets are based in Spark, and their properties are native to data flow.

Dataset objects are reusable entities that can be used in other data flows and activities such as Copy. 

For this demo, we are using Delta, Inline dataset.

When alter row policy allow Delete, Update, Upsert, we have to set Primary Key.

Use Data Flow in Pipeline

we completed the data flow, it is ready for use it in pipeline.

Create a pipeline

Create a pipeline and configure the data flow.

let’s change the source data

Execute the pipeline again, the delta table result

Conclusion

Notes

  • Actions are processed in the order defined.
  • Test rules with Data Preview.
  • Primary Key: The sink must have keys for updates and deletes. Ensure that your sink has a primary key defined, as it is required for update, delete, and upsert operations.

By using the Alter Row Transformation in ADF, you can efficiently manage data changes and ensure that your sink systems are always up-to-date with the latest data from your sources. This transformation is a powerful tool for data engineers working on ETL/ELT pipelines in Azure.

Please do not hesitate to contact me if you have any questions at William . Chen @ mainri.ca

(remove all space from the email account )

Data Flow: Aggregate Transformation

The Aggregate transformation in Azure Data Factory (ADF) Data Flows is a powerful tool for performing calculations on groups of data. It’s analogous to the GROUP BY clause in SQL, allowing you to summarize data based on one or more grouping columns.

Purpose

The Aggregate transformation allows you to:

  • Group data: Group rows based on the values in one or more specified columns.
  • Perform aggregations: Calculate aggregate values (like sum, average, count, min, max, etc.) for each group.

Key Features and Settings:

  • Group By: This section defines the columns by which the data will be grouped. You can select one or more columns. Rows with the same values in these columns will be grouped together.
  • Aggregates: This section defines the aggregations to be performed on each group. You specify:
    • New column name: The name of the resulting aggregated column.
    • Expression: The aggregation function and the column to which it’s applied.

Available Aggregate Functions

ADF Data Flows support a wide range of aggregate functions, including:

  • avg(column): Calculates the average of a column.
  • count(column) or count(*): Counts the number of rows in a group. count(*) counts all rows, even if some columns are null. count(column) counts only non-null values in the specified column.
  • max(column): Finds the maximum value in a column.
  • min(column): Finds the minimum value in a column.
  • sum(column): Calculates the sum of a column.
  • collect(column): Collects all values within a group into an array.
  • first(column): Returns the first value encountered in the group.
  • last(column): Returns the last value encountered in the group.
  • stddev(column): Calculates the standard deviation of a column.
  • variance(column): Calculates the variance of a column.

Preparing test data

With assumed ADF/Synapse expertise, we will focus on aggregate transformation core concepts.

sample dataset
CustID Product Quantity Amount
C1,     A,      2,      20
C1,     B,      3,      30
C2,     C,      1,      10
C1,     A,      2,      20
C3,     A,      3,      30
C2,     B,      1,      10
C3,     C,      2,      20
C1,     C,      3,      30
C1,     A,      2,      20
C2,     A,      1,      30
C3,     C,      3,      10

Create Data Flow

Configure Source

Add Aggregate Transformation

he functionality of aggregate transformations is equivalent to that of the GROUP BY clause in T-SQL.

in SQL script, we write this query:

select product
, count(quantity) as sold_times
, sum(quantity) as sold_items
, sum(amount) as sold_amount 
, avg(amount) as Avg_price
from sales group by product;

get this result
product	sold_times  sold_items  sold_amount   Avg_price
A	   10		6	 120	      24.0
B	   4		12	 40	      20.0
C	   9		3	 70	      17.5

Using Aggregate transformation in this way.

we can use “expression builder” to write the expression

It performs the same grouping and aggregation operations as TSQL’s GROUP BY.

Important Considerations

  • Null Handling: Pay attention to how aggregate functions handle null values. For example, sum() ignores nulls, while count(column) only counts non-null values.
  • Data Types: Ensure that the data types of the columns you’re aggregating are compatible with the chosen aggregate functions.
  • Performance: For large datasets, consider partitioning your data before the Aggregate transformation to improve performance.
  • Distinct Count: For calculating distinct counts, use the countDistinct(column) function.

Conclusion

By using the Aggregate transformation effectively, you can efficiently summarize and analyze your data within ADF Data Flows. Remember to carefully consider the appropriate aggregate functions and grouping columns to achieve your desired results.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account )

Using Exists Transformation for Data Comparison in Azure Data Factory/Synapse

In this article, I will discuss on the Exists Transformation of Data Flow. The exists transformation is a row filtering transformation that checks whether your data exists in another source or stream. The output stream includes all rows in the left stream that either exist or don’t exist in the right stream. The exists transformation is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

I use the Exists transformation in Azure Data Factory or Synapse data flows to compare source and target data.” (This is the most straightforward and generally preferred option.

Create a Data Flow

Create a Source

Create a DerivedColumn Transformation

expression uses : sha2(256, columns())

Create target and derivedColumn transformation

The same way of source creates target. To keep the data type are the same so that we can use hash value to compare, I add a “Cast transformation”;

then the same as source setting, add a derivedColumn transformation.

Exists Transformation to compare Source and target

add a Exists to comparing source and target.

The Exists function offers two options: Exists and Doesn’t Exist. It supports multiple criteria and custom expressions.

Configuration

  1. Choose which data stream you’re checking for existence in the Right stream dropdown.
  2. Specify whether you’re looking for the data to exist or not exist in the Exist type setting.
  3. Select whether or not your want a Custom expression.
  4. Choose which key columns you want to compare as your exists conditions. By default, data flow looks for equality between one column in each stream. To compare via a computed value, hover over the column dropdown and select Computed column.

“Exists” option

Now, let use “Exists” option

we got this depid = 1004 exists.

Doesn’t Exist

use “Doesn’t Exist” option

we got depid = 1003. wholessale exists in Source side, but does NOT exist in target.

Recap

The “Exists Transformation” is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

It is very convenient to compare in data engineering project, e.g. ETL comparison.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)