Data Flow: Alter Row Transformation

Alter Row transformation in ADF modifies data rows in a data flow. It handles insert, update, delete, and upsert operations. You define conditions for each operation. Use it to apply changes to a destination dataset. It works with databases supporting CRUD operations. Configure it in the mapping data flow. Map input columns to target columns. Set policies for row changes. It ensures data consistency. Use expressions for conditional logic. It’s useful for incremental data loads. Supports SQL-based sinks. Optimize performance with proper partitioning.

What is the Alter Row Transformation?

The Alter Row Transformation is used to set row-level policies for data being written to a sink. This transformation is particularly useful when you are working with slowly changing dimensions (SCD) or when you need to synchronize data between source and sink systems.

Key Features

  1. Define Row Actions:
    • Insert: Add new rows.
    • Update: Modify existing rows.
    • Delete: Remove rows.
    • Upsert: Insert or update rows.
    • No Action: Ignore rows.
  2. Condition-Based Rules:
    • Define rules using expressions for each action.
  3. Works with Supported Sinks:
    • SQL Database, Delta Lake, and more.

How Does the Alter Row Transformation Work?

  1. Input Data: The transformation takes input data from a previous transformation in the data flow.
  2. Define Conditions: You define conditions for each action (insert, update, delete, upsert) using expressions.
  3. Output to Sink: The transformation passes the data to the sink, where the specified actions are performed based on the conditions.

Preparing test data

We will focus on aggregate transformation core concepts.

id CustID Product Quantity Amount
1  C1      A	  2	 20
2  C1      B	  3	 30
3  C2      C	  1	 10
4  C1      A	  2	 20
5  C3      A	  3	 30
6  C2      B	  1	 10
7  C3      C	  2	 20
8  C1      C	  3	 30
9  C1      A	  2	 20
10 C2      A	  1	 30
11 C3      C	  3	 10

Use Alter Row Transformation

Step 1: Create Data Flow

Create a Data Flow, add a source transformation and configure it.

preview source data

Step 2: add Alter Transformation

Alter row condition has 4 options:

  • Insert if
  • Update if
  • Delete if
  • Upsert if

Using Dataflow expression builder to build condition

preview its output.

We must originate the action order. Actions are processed in the order defined

Step 3: Add Sink transformation

Add a Sink Transformation, configure it.

Currently, Sink Transformation support some of datasets, Inline datasets and dataset object. such as Database, Blob, ADLS, Delta Lake (Online dataset), detail list at Microsoft Documentation

Inline datasets are recommended when you use flexible schemas, one-off sink instances, or parameterized sinks. If your sink is heavily parameterized, inline datasets allow you to not create a “dummy” object. Inline datasets are based in Spark, and their properties are native to data flow.

Dataset objects are reusable entities that can be used in other data flows and activities such as Copy. 

For this demo, we are using Delta, Inline dataset.

When alter row policy allow Delete, Update, Upsert, we have to set Primary Key.

Use Data Flow in Pipeline

we completed the data flow, it is ready for use it in pipeline.

Create a pipeline

Create a pipeline and configure the data flow.

let’s change the source data

Execute the pipeline again, the delta table result

Conclusion

Notes

  • Actions are processed in the order defined.
  • Test rules with Data Preview.
  • Primary Key: The sink must have keys for updates and deletes. Ensure that your sink has a primary key defined, as it is required for update, delete, and upsert operations.

By using the Alter Row Transformation in ADF, you can efficiently manage data changes and ensure that your sink systems are always up-to-date with the latest data from your sources. This transformation is a powerful tool for data engineers working on ETL/ELT pipelines in Azure.

Please do not hesitate to contact me if you have any questions at William . Chen @ mainri.ca

(remove all space from the email account )

Data Flow: Aggregate Transformation

The Aggregate transformation in Azure Data Factory (ADF) Data Flows is a powerful tool for performing calculations on groups of data. It’s analogous to the GROUP BY clause in SQL, allowing you to summarize data based on one or more grouping columns.

Purpose

The Aggregate transformation allows you to:

  • Group data: Group rows based on the values in one or more specified columns.
  • Perform aggregations: Calculate aggregate values (like sum, average, count, min, max, etc.) for each group.

Key Features and Settings:

  • Group By: This section defines the columns by which the data will be grouped. You can select one or more columns. Rows with the same values in these columns will be grouped together.
  • Aggregates: This section defines the aggregations to be performed on each group. You specify:
    • New column name: The name of the resulting aggregated column.
    • Expression: The aggregation function and the column to which it’s applied.

Available Aggregate Functions

ADF Data Flows support a wide range of aggregate functions, including:

  • avg(column): Calculates the average of a column.
  • count(column) or count(*): Counts the number of rows in a group. count(*) counts all rows, even if some columns are null. count(column) counts only non-null values in the specified column.
  • max(column): Finds the maximum value in a column.
  • min(column): Finds the minimum value in a column.
  • sum(column): Calculates the sum of a column.
  • collect(column): Collects all values within a group into an array.
  • first(column): Returns the first value encountered in the group.
  • last(column): Returns the last value encountered in the group.
  • stddev(column): Calculates the standard deviation of a column.
  • variance(column): Calculates the variance of a column.

Preparing test data

With assumed ADF/Synapse expertise, we will focus on aggregate transformation core concepts.

sample dataset
CustID Product Quantity Amount
C1,     A,      2,      20
C1,     B,      3,      30
C2,     C,      1,      10
C1,     A,      2,      20
C3,     A,      3,      30
C2,     B,      1,      10
C3,     C,      2,      20
C1,     C,      3,      30
C1,     A,      2,      20
C2,     A,      1,      30
C3,     C,      3,      10

Create Data Flow

Configure Source

Add Aggregate Transformation

he functionality of aggregate transformations is equivalent to that of the GROUP BY clause in T-SQL.

in SQL script, we write this query:

select product
, count(quantity) as sold_times
, sum(quantity) as sold_items
, sum(amount) as sold_amount 
, avg(amount) as Avg_price
from sales group by product;

get this result
product	sold_times  sold_items  sold_amount   Avg_price
A	   10		6	 120	      24.0
B	   4		12	 40	      20.0
C	   9		3	 70	      17.5

Using Aggregate transformation in this way.

we can use “expression builder” to write the expression

It performs the same grouping and aggregation operations as TSQL’s GROUP BY.

Important Considerations

  • Null Handling: Pay attention to how aggregate functions handle null values. For example, sum() ignores nulls, while count(column) only counts non-null values.
  • Data Types: Ensure that the data types of the columns you’re aggregating are compatible with the chosen aggregate functions.
  • Performance: For large datasets, consider partitioning your data before the Aggregate transformation to improve performance.
  • Distinct Count: For calculating distinct counts, use the countDistinct(column) function.

Conclusion

By using the Aggregate transformation effectively, you can efficiently summarize and analyze your data within ADF Data Flows. Remember to carefully consider the appropriate aggregate functions and grouping columns to achieve your desired results.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account )

Using Exists Transformation for Data Comparison in Azure Data Factory/Synapse

In this article, I will discuss on the Exists Transformation of Data Flow. The exists transformation is a row filtering transformation that checks whether your data exists in another source or stream. The output stream includes all rows in the left stream that either exist or don’t exist in the right stream. The exists transformation is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

I use the Exists transformation in Azure Data Factory or Synapse data flows to compare source and target data.” (This is the most straightforward and generally preferred option.

Create a Data Flow

Create a Source

Create a DerivedColumn Transformation

expression uses : sha2(256, columns())

Create target and derivedColumn transformation

The same way of source creates target. To keep the data type are the same so that we can use hash value to compare, I add a “Cast transformation”;

then the same as source setting, add a derivedColumn transformation.

Exists Transformation to compare Source and target

add a Exists to comparing source and target.

The Exists function offers two options: Exists and Doesn’t Exist. It supports multiple criteria and custom expressions.

Configuration

  1. Choose which data stream you’re checking for existence in the Right stream dropdown.
  2. Specify whether you’re looking for the data to exist or not exist in the Exist type setting.
  3. Select whether or not your want a Custom expression.
  4. Choose which key columns you want to compare as your exists conditions. By default, data flow looks for equality between one column in each stream. To compare via a computed value, hover over the column dropdown and select Computed column.

“Exists” option

Now, let use “Exists” option

we got this depid = 1004 exists.

Doesn’t Exist

use “Doesn’t Exist” option

we got depid = 1003. wholessale exists in Source side, but does NOT exist in target.

Recap

The “Exists Transformation” is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.

It is very convenient to compare in data engineering project, e.g. ETL comparison.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Change Data Capture with Azure Data Factory and Synapse Analytics

When we perform data integration and ETL processes, the most effective way is only read the source data that has changed since the last time the pipeline ran, rather than always querying an entire dataset on each run.

We will explore the different Change Data Capture (CDC) capabilities (CDC in Mapping Data flowTop level CDC in ADFSynapse link) available in Azure Data Factory and Azure Synapse Analytics.

Support data source and target

currently, ADF support the following data source and target

Supported data sources

  • Avro
  • Azure Cosmos DB (SQL API)
  • Azure SQL Database
  • Azure SQL Managed Instance
  • Delimited Text
  • JSON
  • ORC
  • Parquet
  • SQL Server
  • XML
  • Snowflake

Supported targets

  • Avro
  • Azure SQL Database
  • SQL Managed Instance
  • Delimited Text
  • Delta
  • JSON
  • ORC
  • Parquet
  • Azure Synapse Analytics

Azure Synapse Analytics as Target

When using Azure Synapse Analytics as target, the Staging Settings is available on the main table canvas. Enabling staging is mandatory when selecting Azure Synapse Analytics as the target. 

Staging Settings can be configured in two ways: utilizing Factory settings or opting for a Custom settingsFactory settings apply at the factory level. For the first time, if these settings aren’t configured, you’ll be directed to the global staging setting section for configuration. Once set, all CDC top-level resources will adopt this configuration. Custom settings is scoped only for the CDC resource for which it is configured and overrides the Factory settings.

Known limitations

  • Currently, when creating source/target mappings, each source and target is only allowed to be used once.
  • Complex types are currently unsupported.
  • Self-hosted integration runtime (SHIR) is currently unsupported.

CDC ADLS to SQL Database

Create a CDC artifact

Go to the Author pane in data factory. Below Pipelines, a new top-level artifact called Change Data Capture (preview) appears.

Configuring Source properties

Use the dropdown list to choose your data source. For this demo, select DelimitedText.

To support Change Data Capture (CDC), it’s recommended to create a dedicated Linked Service, as current implementations use a single Linked Service for both source and target.

You can choose to add multiple source folders by using the plus (+) button. The other sources must also use the same linked service that you already selected.

Configuring target

This demo uses a SQL database and a dedicated Linked Service for CDC.

configuring the target table

If existing tables at the target have matching names, they’re selected by default under Existing entities. If not, new tables with matching names are created under New entities. Additionally, you can edit new tables by using the Edit new tables button.

capturing change data studio appears

let’s click the “columns mapping”

If you want to enable the column mappings, select the mappings and turn off the Auto map toggle. Then, select the Column mappings button to view the mappings. You can switch back to automatic mapping anytime by turning on the Auto map toggle.

Configure CDC latency

After your mappings are complete, set your CDC latency by using the Set Latency button.

Publish and starting CDC

After you finish configuring your CDC, select Publish all to publish your changes, then Start to start running your change data capture.

Monitoring CDC

For monitoring CDC, we can either from ADF’s studio’s monitor or from CDC studio

Once data changed, CDC will automatically detecting and tracking data changing, deliver to target

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Building Slowly Changing Dimensions Type 2 in Azure Data Factory and Synapse

Within the context of enterprise data warehousing, the effective management of historical data is essential for supporting informed business decision-making. Slowly Changing Dimension (SCD) Type 2 is a widely adopted technique for addressing changes in data over time.

A brief overview of Slowly Changing Dimensions Type 2

Slowly Changing Dimensions Type 2 (SCD Type 2) is a common solution for managing historical data. To ensure clarity, I’ll briefly recap SCD Type 2.

A Type 2 of SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record.

Existing Dimension data
surrokey	depID	dep	IsActivity
1	        1001	IT	1
2	        1002	HR	1
3	        1003	Sales	1
Dimension changed and new data comes 
depId dep
1003  wholesale   <--- depID is same, name changed from "Sales" to "wholesale"
1004  Finance     <--- new data

Mark existing dimensional records as expired (inactive); create a new record for the current dimensional data; and insert new incoming data as new dimensional records.

Now, the new Dimension will be:
surrokey  depID	dep	   IsActivity
1	  1001	IT	   1   <-- No action required
2	  1002	HR	   1   <-- No action required
3	  1003	Sales	   0   <-- mark as inactive
4         1003  wholesale  1   <-- add updated active value
5         1004  Finance    1   <-- insert new data

This solution demonstrates the core concepts of a Slowly Changing Dimension (SCD) Type 2 implementation. While it covers the major steps involved, real-world production environments often have more complex requirements. When designing dimension tables (e.g., the dep table), I strongly recommend adding more descriptive columns to enhance clarity. Specifically, including [Start_active_date] and [End_active_date] columns significantly improves the traceability and understanding of dimension changes over time.

Implementing SCD Type 2

Step 1: Create a Dimension Table- dep

# Create table
create table dep (
surrokey int IDENTITY(1, 1), 
depID int, 
dep varchar(50), 
IsActivity bit);

# Insert data, 
surrokey	depID	dep	IsActivity
1	        1001	IT	1
2	        1002	HR	1
3	        1003	Sales	1

Step 2: Create Data Flow

Add the source dataset. dataset should point to file which is located in your source layer.

We have 2 data rows. That means depID =1003, updated value, a new comes depID=1004 need add into dimension table.

Step 3: Add derived column

Add derived column resource and add column name as isactive and provide the value as 1.

Step 4: Sink dimension data

Create a dataset point to SQL Server Database Table dep

Add a Sink use above dataset, SQLServer_dep_table

Configure the sink mappings as shown below

Step 5: Add SQL dataset as another source.

Step 6: Rename column from Database Table dep

Use select resource to rename columns from SQL table.

rename column name:

  • depID –> sql_depID
  • dep –> sql_dep
  • Isactivity –> sql_IsActivity

Step 7: Lookup

Add lookup to join new dimension data that we have import in “srcDep” at “Step 2”

At this step, existing dimension table “Left Join” out the new coming dimension (need update info or new comes dimension values).

  • existing dimension data, depID=1003 ,previously “dep” called “Sales” , now it need changing to “wholesales”

Step 8: filter out non-nulls

Add filter, filter out the rows which has non-nulls in the source file columns.

Filter expression : depID column is not null. 
!isNull(depid)

This requires filtering the ‘lkpNeedUpdate’ lookup output to include only rows where the depID is not null.

Step 9: Select need columns

Since up stream “filterNonNull” output more columns,

Not all columns are required. The objective is to use the new data (containing depid and dep) to update existing information in the dimension table (specifically sql_depID, sql_dep, and sql_isActivity) and mark the old information as inactive.

Add a “SELECT” to select need the columns that we are going to insert or update in Database dimension table.

Step 10: add a new column and give its value = “0”

Add a deriver, set its value is “0” , means mark it as “inactive

Step 11: alter row

Add a “Alter Row” to update row information.

configure alter conditions:

Update     1==1 

Step 12 Sink updated information

we have updated the existing rows, mark it “0” as “inactive”. it time to save it into database dimension table.

Add a “Sink” point to database dimension table – dep

mapping the columns,

sql_depid  ---> depID
sql_dep  ---> dep
ActivityStatus  ---> IsActivity

Step 13: Adjust Sink order

As there are two sinks, one designated for the source data and the other for the updated data, a specific processing order must be enforced.

Click on a blank area of the canvas, at “Settings” tag, configure them order.
1: sinkUpdated
2: sinkToSQLDBdepTable

Step 14: creata a pipeline

create a pipeline, add this data flow, run it.

SELECT TOP (5000) [surrokey]
      ,[depID]
      ,[dep]
      ,[IsActivity]
  FROM [williamSQLDB].[dbo].[dep]

surrokey  depID	  dep	       IsActivity
1	  1001	  IT	        1
2	  1002	  HR	        1
3	  1003    Sales	        0
4	  1003    Wholesale	1
5	  1004	  Finance	1

Conclusion

In conclusion, we have explored the powerful combination of Slowly Changing Dimensions Type 2, it has provided you with a comprehensive understanding of how to effectively implement SCD Type 2 in your data warehousing projects, leveraging modern technologies and following industry best practices.

By implementing SCD Type 2 according to Ralph Kimball’s approach, organizations can achieve a comprehensive view of dimensional data, enabling accurate trend analysis, comparison of historical performance, and tracking of changes over time. It empowers businesses to make data-driven decisions based on a complete understanding of the data’s evolution, ensuring data integrity and reliability within the data warehousing environment.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Dynamic ETL Mapping in Azure Data Factory/Synapse Analytics: Source-to-Target Case Study Implementation (1)

This article is part of a series dedicated to dynamic ETL Source-to-Target Mapping (STM) solutions, covering both batch and near real-time use cases. The series will explore various mapping scenarios, including one-to-many, many-to-one, and many-to-many relationships, with implementations provided throughout.

You need create and alter metadata table privilege.

Scenario

In this article, I will focus on scenario where the source schema may have new or missing columns, or the destination schema may have columns with different names or might lack columns to accommodate new incoming source fields.

Requirement:

Dynamically handle source variations, map data to the consistent destination schema, and handle missing columns gracefully. Giving default value to missed column, add new column to target DB table if they are new coming.

Source:

CSV, Schema varies between executions (columns may be missing, reordered, or new).
current source columns’ name: name, age, gender and state

Destination:

Database, SQL DB,
columns’ name: emp_ID , emp_Name, emp_age, gender, dep_id.

Problem:

  • emp_ID and dep_id missed from source data.
  • schema name are not exactly same
    name <—-> emp_Name
    age <—-> emp_age
  • target DB table does not have the column “state”
Source data:
name age gander state
Bill 32 M NY
Mary 34 F CA
Tom 23 M FL
Jim 26 M CA
Afton_Taborek 45 F FL
Amélie_Gilker 43 M NY
target SQLDB Table: emp
CREATE TABLE [dbo].[emp](
[emp_id] [nvarchar](50) NULL,
[emp_name] [nvarchar](50) NULL,
[emp_age] [nvarchar](50) NULL,
[gender] [nvarchar](50) NULL,
[dep_id] [nvarchar](50) NULL,
)
col_name data_type allow_null
emp_id nvarchar(50) null
emp_name nvarchar(50) null
emp_age nvarchar(50) null
gender nvarchar(50) null
dep_id nvarchar(50) null

Key components and steps of solution.

  1. Create metadata to hold STM plan
  2. Get metadata activity retrieves source data schema – columns’ name, data type
  3. Reset fields active status to False.
  4. ForEach coming source fields in metadata table, activities/field mapping/target column
  5. Retrieving each STM mapping plan from metadata table generate complete mapping plan
  6. Copy activity applying the mapping plan to “Dynamic Content of Mapping”

Solution

Step 1: Create a metadata table to hold mapping plan

CREATE TABLE [dbo].[metadata](
	[source_filename] [varchar](max) NULL,
	[src_col] [varchar](50) NULL,
	[src_dataType] [varchar](50) NULL,
	[src_col_createdate] [datetime] DEFAULT getdate() NULL,
	[src_col_activity] [bit] NULL,
	[destination_schema] [varchar](50) NULL,
	[destination_table] [varchar](50) NULL,
	[dst_col] [varchar](50) NULL,
	[dst_dataType] [varchar](50) NULL,
	[dst_createdate] [datetime] DEFAULT getdate() NULL,
	[dst_col_activity] [bit] NULL,
	[mapping] [varchar](max) NULL
)

[mapping] column will be a json style string, that indicate this source column will map to target column’s name. Its pattern is :

{
  "source": {
     "name": "Field/column name",
     "type": "column generalized dataType",
     "physicalType":"coming column's Native dataType"
  },
  "sink": {
     "name": "Table column name",
     "type": "coming column's generalized dataType",
     "physicalType":"column's target database Native dataType"
  } 
}
“name”

Field’s name in the file, or column’s name in t DB table.

“type”

Logical Data Type. The abstract or generalized type used by Azure Data Factory (ADF) to interpret data regardless of the underlying system or format.
For example, string, integer, double etc.

“physicalType”

The specific type defined by the database or file system where the data resides.
For example, VARCHAR, NVARCHAR, CHAR, INT, FLOAT, NUMERIC(18,10), TEXT etc. in database

Each column has this source-to-sink mapping plan, we will concat all column’s mapping plan, generate a complete Source to Target mapping (STM) plan.

Step 2: Creating known field-column mapping plan

For each known field or column, create a Source-to-Target mapping plan, save it in the “mapping” column of the database metadata table, formatted in JSON style string.

# id field mapping plan
{
  "source": {
     "name": "id",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_id",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# name field mapping plan
{
  "source": {
     "name": "name",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_name",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# age field mapping plan
{
  "source": {
     "name": "age",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "emp_age",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# gander field mapping plan
{
  "source": {
     "name": "gander",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "gender",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

# "dep_id"field mapping plan
{
  "source": {
     "name": "dep_id",
     "type": "string",
     "physicalType":"string"
  },
  "sink": {
     "name": "dep_id",
     "type": "nvarchar(max)",
     "physicalType":"nvarchar(max)"
  } 
}

We will utilize the column mapping plans to generate a comprehensive “copy activity” mapping plan.

For any new or unknown fields that may arise, we will address them in subsequent steps.

Step 3: get source metadata

Create a pipeline.

l name it “pl_dynamic_source_to_target_mapping”

Create variables

  • var_sourcename, string
  • var_field_name, string
  • var_field_type, string
  • var_mapping_plan, string

Add a “Get metadata” activity and setup it.

We need field list:

  • Item name,
  • Item type,
  • structure.

“it”get metadata” get the return

{
	"itemName": "name.csv",
	"itemType": "File",
	"structure": [
		{
			"name": "name",
			"type": "String"
		},
		{
			"name": "age",
			"type": "String"
		},
		{
			"name": "gander",
			"type": "String"
		},
		{
			"name": "state",
			"type": "String"
		}
	],
	"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
	"executionDuration": 1,
	"durationInQueue": {
		"integrationRuntimeQueue": 0
	},
	"billingReference": {
		"activityType": "PipelineActivity",
		"billableDuration": [
			{
				"meterType": "AzureIR",
				"duration": 0.016666666666666666,
				"unit": "Hours"
			}
		]
	}
}

Step 4: Reset the activity status of all source fields in the metadata table to False

Save source data name

Since we will address the item’s metadata one field by one field later, saving source data name in variable is convenient.

add a “Set variable” to save source data name in variable – “var_sourcename”

Reset all source fields to False

Add a “lookup activity”, reset the activity status of all source fields in the metadata table to False.

lookup query:

UPDATE metadata SET
src_col_activity = 0
WHERE source_filename = '@{variables('var_sourcename')}';
SELECT 1;

This is one of the important steps. It allows us to focus on the incoming source fields. When we build the complete ETL Source-to-Target mapping plan, we will utilize these incoming fields.

Step 5: ForEach address source data fields

Add the ‘ForEach activity’ to the pipeline, using the ‘structure’ to address the source data fields one by one.

Save source data field name and data type

In the ForEach activity, add two “Set variable” to save source data field name and data type in variable .
ForEach’s @item().name —> var_field_name
ForEach’s @item().type —> var_field_type

Lookup source fields in metadata table

Continue in ForEach activity, add a “lookup activity”, create a dataset point to metadata table.

Lookup query:

IF NOT EXISTS (
SELECT src_col from metadata
WHERE
source_filename = '@{variables('var_sourcename')}'
AND src_col = '@{variables('var_field_name')}'
)
BEGIN
-- Alter target table schema
IF NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'emp' AND COLUMN_NAME = '@{variables('var_field_name')}' )
ALTER TABLE emp ADD @{item().name} NVARCHAR(max);
SELECT 'target altered'; -- return

-- insert field metadata and STM plan
INSERT INTO metadata
(source_filename
, src_col
, src_dataType
, src_col_activity
, destination_schema
, destination_table
, dst_col
, dst_dataType
, dst_col_activity
, mapping)
VALUES
(
'@{variables('var_sourcename')}'
, '@{variables('var_field_name')}'
, '@{variables('var_field_type')}'
, 1
, 'dbo'
, 'emp'
, '@{variables('var_field_name')}'
, 'NVARCHAR'
, 1
, '{
"source": {
"name": "@{variables('var_field_name')}",
"type": "@{variables('var_field_type')}",
"physicalType":"@{variables('var_field_type')}"
},
"sink": {
"name": "@{variables('var_field_name')}",
"type": "nvarchar(max)",
"physicalType":"nvarchar(max)"
}
}'
);
SELECT 'insert field metadata';-- return
END
ELSE
BEGIN
UPDATE metadata SET src_col_activity = 1
WHERE source_filename = '@{variables('var_sourcename')}'
AND src_col = '@{variables('var_field_name')}'
select 'this field actived'; -- return
END;
  1. Check if the current source field exists in the ‘metadata’ table.
    If the field’s name is found, update its activity status to True as an existing field. If the field’s name is not present, it indicates a new field. Insert this new field into the metadata table and establish its mapping plan to specify its intended destination.
  2. Check the target table [emp] to verify if the column exists. If the column is not present, alter the schema of the target table [emp] to add a new column to the destination table.

the target table schema altered

new field, “state”, metadata inserted in to the metadata table

new field mapping plan

'{
  "source": {
     "name": "@{variables('var_field_name')}",
     "type": "@{variables('var_field_type')}",
     "physicalType":"@{variables('var_field_type')}"
  },
  "sink": {
     "name": "@{variables('var_field_name')}",
     "type": "@{variables('var_field_type')}",
     "physicalType":"@{variables('var_field_type')}"
  } 
}'

Step 6: Generate the complete ETL mapping plan

Generate complete ETL mapping plan

Add a “Lookup activity” to generate complete ETL mapping plan, use metadata table dataset.

This ‘lookup activity’ queries all activity field mapping plans from the metadata table to generate a complete STM mapping plan.

Query:
select 
concat(
'{"type": "TabularTranslator",
"mappings": [' 
, string_agg(mapping,',') 
,'],'
,'"typeConversion": true,"typeConversionSettings": {"allowDataTruncation": false, "treatBooleanAsNumber": false}'
) as stm
from metadata
where 
[source_filename] = '@{variables('var_sourcename')}' 
and [src_col_activity] = 1

Also add “Set variable” to save the STM to variable “var_mapping_plan”

@activity('lkp generate entire ETL mapping  plan').output.firstRow.stm

Step 7: Copy source data to target

Having established the dynamic mapping plan, we are now prepared to ingest data from the source and deliver it to the target. All preceding steps were dedicated to the development of the ETL mapping plan.

Copy activity: Applying the STM mapping plan

Add a “Copy activity”, using Source and Sink dataset we built previous.

changing to “Mapping” tag, click “Add dynamic content”, write expression:

@json(variables('var_mapping_plan'))

All previous steps were dedicated to building the ETL mapping plan.

Done !!!

Afterword

This article focuses on explaining the underlying logic of dynamic source-to-target mapping through a step-by-step demonstration. To clearly illustrate the workflow and logic flow, four “Set Variable” activities and four pipeline variables are included. However, in a production environment, these are not required.

Having demonstrated dynamic source-to-target mapping with all necessary logic flow steps, this solution provides a foundation that can be extended to other scenarios, such as one-to-many, many-to-one, and many-to-many mappings. Implementations for these scenarios will be provided later.

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
(remove all space from the email account 😊)

Azure Data Factory or Synapse Copy Activity with File System

In Azure Data Factory (ADF) or Synapse, using Copy Activity with a File System as a source or sink is common when dealing with on-premises file systems, network file shares, or even cloud-based file storage systems. Here’s an overview of how it works, key considerations, and steps to set it up.

Key Components and setup with File System:

Create a File System Linked Service

Linked Service: For on-premises or network file systems, you typically need a Self-hosted Integration Runtime (SHIR).

Fill in the required fields:

  • Connection: Specify the file system type (e.g., network share or local path).
  • Authentication: Provide the appropriate credentials, such as username/password, or key-based authentication.
  • If the file system is on-premises, configure the Self-hosted Integration Runtime to access it.

Create File System Dataset

Go to Datasets in ADF and create a new dataset. Select File System as the data source.

Configure the dataset to point to the correct file or folder:

  • Specify the File Path.
  • Define the file format (e.g., CSV, JSON, XML).
  • Set any schema information if required (for structured data like CSV).

Considerations:

  • Integration Runtime: For on-premises file systems, the Self-hosted Integration Runtime (SHIR) is essential to securely move data from private networks.
  • Performance: Data transfer speeds depend on network configurations (for on-prem) and ADF’s parallelism settings.
  • File Formats: Ensure proper handling of different file formats (e.g., CSV, JSON, Binary etc.) and schema mapping for structured files.
  • Security: Ensure credentials and network configurations are correctly set up, and consider encryption if dealing with sensitive data.

Common Errors:

  • Connection issues: If the SHIR is not correctly configured, or if there are issues with firewall or network settings, ADF may not be able to access the file system.
  • Permission issues: Ensure that the correct permissions are provided to access the file system (file share, SMB, FTP, etc.).

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)

Comparative Analysis of Linked Services in Azure Data Factory and Azure Synapse Analytics

Azure Data Factory (ADF) and Azure Synapse Analytics (ASA) both utilize linked services to establish connections to external data sources, compute resources, and other Azure services. While they share similarities, there are key differences in their implementation, use cases, and integration capabilities.

When you create Linked Services, some of them are slimier. But have slightly different purposes and have some key differences. for example “Databricks” and “Databricks Delta Lake”, “REST” and “HTTP”. Here is side by side comparisons of difference.

“Linked Services to databricks” and “Linked Services to databricks delta lake”

Here’s a side-by-side comparison between Databricks and Databricks Delta Lake Linked Services in Azure Data Factory (ADF):

Key differences and when to use which:

  • Databricks Linked Service is for connecting to the compute environment (jobs, notebooks) of Databricks.
  • Databricks Delta Lake Linked Service is for connecting directly to Delta Lake data storage (tables/files).
FeatureDatabricks Linked ServiceDatabricks Delta Lake Linked Service
PurposeConnect to an Azure Databricks workspace to run jobs or notebooks.Connect to Delta Lake tables within Azure Databricks.
Primary Use CaseRun notebooks, Python/Scala/Spark scripts, and perform data processing tasks on Databricks.Read/write data from/to Delta Lake tables for data ingestion or extraction.
Connection TypeConnects to the compute environment of Databricks (notebooks, clusters, jobs).Connects to data stored in Delta Lake format (structured data files).
Data StorageNot focused on specific data formats; used for executing Databricks jobs.Specifically used for interacting with Delta Lake tables (backed by Parquet files).
ACID TransactionsDoes not inherently support ACID transactions (although Databricks jobs can handle them in notebooks).Delta Lake supports ACID transactions (insert, update, delete) natively.
Common Activities– Running Databricks notebooks.
– Submitting Spark jobs.
– Data transformation using PySpark, Scala, etc.
– Reading from or writing to Delta Lake.
– Ingesting or querying large datasets with Delta Lake’s ACID support.
Input/OutputInput/output via Databricks notebooks, clusters, or jobs.Input/output via Delta Lake tables/files (with versioning and schema enforcement).
Data ProcessingFocus on data processing (ETL/ELT) using Databricks compute power.Focus on data management within Delta Lake storage layer, including handling updates and deletes.
When to Use– When you need to orchestrate and run Databricks jobs for data processing.– When you need to read or write data specifically stored in Delta Lake.
– When managing big data with ACID properties.
Integration in ADF PipelinesExecute Databricks notebook activities or custom scripts in ADF pipelines.Access Delta Lake as a data source/destination in ADF pipelines.
Supported FormatsAny format depending on the jobs or scripts running in Databricks.Primarily deals with Delta Lake format (which is based on Parquet).

REST Linked Service and HTTP Linked Service

In Azure Data Factory (ADF), both the REST and HTTP linked services are used to connect to external services, but they serve different purposes and have distinct configurations.

When to use which?

  • REST Linked Service: Use it when working with APIs that require advanced authentication, return paginated JSON data, or have dynamic query/header needs.
  • HTTP Linked Service: Use it for simpler tasks like downloading files from a public or basic-authenticated HTTP server.
FeatureREST Linked ServiceHTTP Linked Service
PurposeInteract with RESTful APIsGeneral-purpose HTTP access
Authentication MethodsAAD, Service Principal, etc.Basic, Anonymous
Pagination SupportYesNo
Dynamic Headers/ParamsYesLimited
File AccessNoYes
Data FormatJSONFile or raw data

Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca

(remove all space from the email account 😊)