In this article, I will discuss on the Exists Transformation of Data Flow. The exists transformation is a row filtering transformation that checks whether your data exists in another source or stream. The output stream includes all rows in the left stream that either exist or don’t exist in the right stream. The exists transformation is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.
I use the Exists transformation in Azure Data Factory or Synapse data flows to compare source and target data.” (This is the most straightforward and generally preferred option.
Create a Data Flow
Create a Source
Create a DerivedColumn Transformation
expression uses : sha2(256, columns())
Create target and derivedColumn transformation
The same way of source creates target. To keep the data type are the same so that we can use hash value to compare, I add a “Cast transformation”;
then the same as source setting, add a derivedColumn transformation.
Exists Transformation to compare Source and target
add a Exists to comparing source and target.
The Exists function offers two options: Exists and Doesn’t Exist. It supports multiple criteria and custom expressions.
Configuration
Choose which data stream you’re checking for existence in the Right stream dropdown.
Specify whether you’re looking for the data to exist or not exist in the Exist type setting.
Select whether or not your want a Custom expression.
Choose which key columns you want to compare as your exists conditions. By default, data flow looks for equality between one column in each stream. To compare via a computed value, hover over the column dropdown and select Computed column.
“Exists” option
Now, let use “Exists” option
we got this depid = 1004 exists.
Doesn’t Exist
use “Doesn’t Exist” option
we got depid = 1003. wholessale exists in Source side, but does NOT exist in target.
Recap
The “Exists Transformation” is similar to SQL WHERE EXISTS and SQL WHERE NOT EXISTS.
It is very convenient to compare in data engineering project, e.g. ETL comparison.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
When we perform data integration and ETL processes, the most effective way is only read the source data that has changed since the last time the pipeline ran, rather than always querying an entire dataset on each run.
We will explore the different Change Data Capture (CDC) capabilities (CDC in Mapping Data flow, Top level CDC in ADF, Synapse link) available in Azure Data Factory and Azure Synapse Analytics.
Support data source and target
currently, ADF support the following data source and target
Supported data sources
Avro
Azure Cosmos DB (SQL API)
Azure SQL Database
Azure SQL Managed Instance
Delimited Text
JSON
ORC
Parquet
SQL Server
XML
Snowflake
Supported targets
Avro
Azure SQL Database
SQL Managed Instance
Delimited Text
Delta
JSON
ORC
Parquet
Azure Synapse Analytics
Azure Synapse Analytics as Target
When using Azure Synapse Analytics as target, the Staging Settings is available on the main table canvas. Enabling staging is mandatory when selecting Azure Synapse Analytics as the target.
Staging Settings can be configured in two ways: utilizing Factory settings or opting for a Custom settings. Factory settings apply at the factory level. For the first time, if these settings aren’t configured, you’ll be directed to the global staging setting section for configuration. Once set, all CDC top-level resources will adopt this configuration. Custom settings is scoped only for the CDC resource for which it is configured and overrides the Factory settings.
Known limitations
Currently, when creating source/target mappings, each source and target is only allowed to be used once.
Complex types are currently unsupported.
Self-hosted integration runtime (SHIR) is currently unsupported.
CDC ADLS to SQL Database
Create a CDC artifact
Go to the Author pane in data factory. Below Pipelines, a new top-level artifact called Change Data Capture (preview) appears.
Configuring Source properties
Use the dropdown list to choose your data source. For this demo, select DelimitedText.
To support Change Data Capture (CDC), it’s recommended to create a dedicated Linked Service, as current implementations use a single Linked Service for both source and target.
You can choose to add multiple source folders by using the plus (+) button. The other sources must also use the same linked service that you already selected.
Configuring target
This demo uses a SQL database and a dedicated Linked Service for CDC.
configuring the target table
If existing tables at the target have matching names, they’re selected by default under Existing entities. If not, new tables with matching names are created under New entities. Additionally, you can edit new tables by using the Edit new tables button.
capturing change data studio appears
let’s click the “columns mapping”
If you want to enable the column mappings, select the mappings and turn off the Auto map toggle. Then, select the Column mappings button to view the mappings. You can switch back to automatic mapping anytime by turning on the Auto map toggle.
Configure CDC latency
After your mappings are complete, set your CDC latency by using the Set Latency button.
Publish and starting CDC
After you finish configuring your CDC, select Publish all to publish your changes, then Start to start running your change data capture.
Monitoring CDC
For monitoring CDC, we can either from ADF’s studio’s monitor or from CDC studio
Once data changed, CDC will automatically detecting and tracking data changing, deliver to target
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
Azure offers several SQL-related services, each tailored to different use cases and requirements. Below is a comparison of Azure SQL Managed Instance, Azure SQL Database, and Azure SQL Server (often referred to as a logical SQL Server in Azure).
Azure SQL Database
1. Azure SQL Database
Description: A fully managed, platform-as-a-service (PaaS) relational database offering. It is designed for modern cloud applications and supports single databases and elastic pools.
Use Cases:
Modern cloud-native applications.
Microservices architectures.
Applications requiring automatic scaling, high availability, and minimal management overhead.
Key Features:
Single database or elastic pools (shared resources for multiple databases).
Automatic backups, patching, and scaling.
Built-in high availability (99.99% SLA).
Serverless compute tier for cost optimization.
Limited SQL Server surface area (fewer features compared to Managed Instance).
Limitations:
No support for SQL Server Agent, Database Mail, or cross-database queries.
Limited compatibility with on-premises SQL Server features.
Management: Fully managed by Microsoft; users only manage the database and its resources.
Azure SQL Managed Instance
Description: A fully managed instance of SQL Server in Azure, offering near 100% compatibility with on-premises SQL Server. It is part of the PaaS offering but provides more control and features compared to Azure SQL Database.
Use Cases:
Lift-and-shift migrations of on-premises SQL Server workloads.
Applications requiring full SQL Server compatibility.
Scenarios needing features like SQL Server Agent, cross-database queries, or linked servers.
Key Features:
Near 100% compatibility with SQL Server.
Supports SQL Server Agent, Database Mail, and cross-database queries.
Built-in high availability (99.99% SLA).
Virtual network (VNet) integration for secure connectivity.
Automated backups and patching.
Limitations:
Higher cost compared to Azure SQL Database.
Slightly longer deployment times.
Limited to a subset of SQL Server features (e.g., no Windows Authentication).
Management: Fully managed by Microsoft, but users have more control over instance-level configurations.
Azure SQL Server
Description: A logical server in Azure that acts as a central administrative point for Azure SQL Database and Azure SQL Managed Instance. It is not a standalone database service but rather a management layer.
Use Cases:
Managing multiple Azure SQL Databases or Managed Instances.
Centralized authentication and firewall rules.
Administrative tasks like setting up logins and managing access.
Key Features:
Acts as a gateway for Azure SQL Database and Managed Instance.
Supports Azure Active Directory (AAD) and SQL authentication.
Configurable firewall rules for network security.
Provides a connection endpoint for databases.
Limitations:
Not a database service itself; it is a management tool.
Does not host databases directly.
Management: Users manage the server configuration, logins, and firewall rules.
Side by side Comparison
Feature/Aspect
Azure SQL Database
Azure SQL Managed Instance
Azure SQL Server (Logical)
Service Type
Fully managed PaaS
Fully managed PaaS
Management layer
Compatibility
Limited SQL Server features
Near 100% SQL Server compatibility
N/A (management tool)
Use Case
Cloud-native apps
Lift-and-shift migrations
Centralized management
High Availability
99.99% SLA
99.99% SLA
N/A
VNet Integration
Limited (via Private Link)
Supported
N/A
SQL Server Agent
Not supported
Supported
N/A
Cross-Database Queries
Not supported
Supported
N/A
Cost
Lower
Higher
Free (included in service)
Management Overhead
Minimal
Moderate
Minimal
SQL Server’s Side-by-Side Feature: Not Available in Azure SQL
Following are list that shows SQL Server have but not available in Azure SQL Database and Azure SQL Managed Instance.
1. Instance-Level Features
Feature
SQL Server
Azure SQL Database
Azure SQL Managed Instance
Multiple Databases Per Instance
✅ Full support
❌ Only single database per instance
✅ Full support
Cross-Database Queries
✅ Full support
❌ Limited with Elastic Query
✅ Full support
SQL Server Agent
✅ Full support
❌ Not available
✅ Supported (with limitations)
PolyBase
✅ Full support
❌ Not available
❌ Not available
CLR Integration (SQL CLR)
✅ Full support
❌ Not available
✅ Supported (with limitations)
FileStream/FileTable
✅ Full support
❌ Not available
❌ Not available
2. Security Features
Feature
SQL Server
Azure SQL Database
Azure SQL Managed Instance
Database Mail
✅ Full support
❌ Not available
❌ Not available
Service Broker
✅ Full support
❌ Not available
❌ Not available
Custom Certificates for Transparent Data Encryption (TDE)
✅ Full support
❌ Limited to Azure-managed keys
❌ Limited customization
3. Integration Services
Feature
SQL Server
Azure SQL Database
Azure SQL Managed Instance
SSIS Integration
✅ Full support
❌ Requires external tools
❌ Requires external tools
SSRS Integration
✅ Full support
❌ Not available
❌ Not available
SSAS Integration
✅ Full support
❌ Not available
❌ Not available
4. Specialized Features
Feature
SQL Server
Azure SQL Database
Azure SQL Managed Instance
Machine Learning Services (R/Python)
✅ Full support
❌ Not available
❌ Not available
Data Quality Services (DQS)
✅ Full support
❌ Not available
❌ Not available
Conclusion
Azure SQL Database: Ideal for new cloud-native applications or applications that don’t require full SQL Server compatibility.
Azure SQL Managed Instance: Best for migrating on-premises SQL Server workloads to the cloud with minimal changes.
Azure SQL Server (Logical): Used for managing and administering Azure SQL Databases and Managed Instances.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
Built on Apache Spark, Delta Lake provides a robust storage layer for data in Delta tables. Its features include ACID transactions, high-performance queries, schema evolution, and data versioning, among others.
Today’s focus is on how Delta Lake simplifies the management of slowly changing dimensions (SCDs).
Quickly review Type 2 of Slowly Changing Dimension
A quick recap of SCD Type 2 follows:
Storing historical dimension data with effective dates.
Keeping a full history of dimension changes (with start/end dates).
Adding new rows for dimension changes (preserving history).
# Existing Dimension datasurrokey depID dep StartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 1
2 1002 Sales 2019-01-01 9999-12-31 1
3 1003 HR 2019-01-01 9999-12-31 1
# Dimension changed and new data comes
depId dep
1003 wholesale <--- depID is same, name changed from "Sales" to "wholesale"
1004 Finance <--- new data
# the new Dimension will be:surrokey depID depStartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 1 <-- No action required
2 1002 HR 2019-01-01 9999-12-31 1 <-- No action required
3 1003 Sales 2019-01-01 2020-12-31 0 <-- mark as inactive
4 1003 wholesale 2021-01-01 9999-12-31 1 <-- add updated active value
5 1004 Finance 2021-01-01 9999-12-31 1 <-- insert new data
Creating demo data
We’re creating a Delta table, dim_dep, and inserting three rows of existing dimension data.
Existing dimension data
%sql
# Create table dim_dep
%sql
create table dim_dep (
Surrokey BIGINT GENERATED ALWAYS AS IDENTITY
, depID int
, dep string
, StartDate DATE
, End_date DATE
, IsActivity BOOLEAN
)
using delta
location 'dbfs:/mnt/dim/'
# Insert data
insert into dim_dep (depID,dep, StartDate,EndDate,IsActivity) values
(1001,'IT','2019-01-01', '9999-12-31' , 1),
(1002,'Sales','2019-01-01', '9999-12-31' , 1),
(1003,'HR','2019-01-01', '9999-12-31' , 1)
select * from dim_dep
Surrokey depID dep StartDate EndDate IsActivity
1 1001 IT 2019-01-01 9999-12-31 true
2 1002 Sales 2019-01-01 9999-12-31 true
3 1003 HR 2019-01-01 9999-12-31 true
We perform a source dataframe – df_dim_dep_source, left outer join target dataframe – df_dim_dep_target, where source depID = target depID, and also target’s IsActivity = 1 (meant activity)
This join’s intent is not to miss any new data coming through source. And active records in target because only for those data SCD update is required. After joining source and target, the resultant dataframe can be seen below.
Step 4: Filter only the non matched and updated records
In this demo, we only have depid and dep two columns. But in the actual development environment, may have many many columns.
Instead of comparing multiple columns, e.g., src_col1 != tar_col1, src_col2 != tar_col2, ….. src_colN != tar_colN We compute hashes for both column combinations and compare the hashes. In addition of this, in case of column’s data type is different, we convert data type the same one.
The row, dep_id = 1003, dep = HR, was filtered out because both source and target side are the same. No action required.
The row, depid =1002, dep changed from “Sales” to “Wholesale”, need updating.
The row, depid = 1004, Finance is brand new row, need insert into target side – dimension table.
Step 5: Find out records that will be used for inserting
From above discussion, we have known depid=1002, need updating and depid=1004 is a new rocord. We will create a new column ‘merge_key’ which will be used for upsert operation. This column will hold the values of source id.
Add a new column – “merge_key”
df_inserting = df_filtered. withColumn('merge_key', col('depid'))
df_inserting.show()
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
|depid| dep|tar_surrokey|tar_depID|tar_dep|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
| 1002|Wholesale| 2| 1002| Sales| 2019-01-01| 9999-12-31| true| 1002|
| 1004| Finance| null| null| null| null| null| null| 1004|
+-----+---------+------------+---------+-------+-------------+-----------+--------------+---------+
The above 2 records will be inserted as new records to the target table
The above 2 records will be inserted as new records to the target table.
Step 6: Find out the records that will be used for updating in target table
from pyspark.sql.functions import lit
df_updating = df_filtered.filter(col('tar_depID').isNotNull()).withColumn('merge_key',lit('None')
df_updating.show()
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
|depid| dep|tar_surrokey|tar_depID|tar_StartDate|tar_EndDate|tar_IsActivity|merge_key|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
| 1003|Wholesale| 3| 1003| 2019-01-01| 9999-12-31| true| None|
+-----+---------+------------+---------+-------------+-----------+--------------+---------+
The above record will be used for updating SCD columns in the target table.
This dataframe filters the records that have tar_depID column not null which means, the record already exists in the table for which SCD update has to be done. The column merge_key will be ‘None’ here which denotes this only requires update in SCD cols.
Step 7: Combine inserting and updating records as stage
we demonstrated how to unlock the power of Slowly Changing Dimension (SCD) Type 2 using Delta Lake, a revolutionary storage layer that transforms data lakes into reliable, high-performance, and scalable repositories. With this approach, organizations can finally unlock the full potential of their data and make informed decisions with confidence
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
Within the context of enterprise data warehousing, the effective management of historical data is essential for supporting informed business decision-making. Slowly Changing Dimension (SCD) Type 2 is a widely adopted technique for addressing changes in data over time.
A brief overview of Slowly Changing Dimensions Type 2
Slowly Changing Dimensions Type 2 (SCD Type 2) is a common solution for managing historical data. To ensure clarity, I’ll briefly recap SCD Type 2.
A Type 2 of SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record.
Existing Dimension data
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 1
Dimension changed and new data comes
depId dep
1003 wholesale <--- depID is same, name changed from "Sales" to "wholesale"
1004 Finance <--- new data
Mark existing dimensional records as expired (inactive); create a new record for the current dimensional data; and insert new incoming data as new dimensional records.
Now, the new Dimension will be:
surrokey depID dep IsActivity
1 1001 IT 1 <-- No action required
2 1002 HR 1 <-- No action required
3 1003 Sales 0 <-- mark as inactive
4 1003 wholesale 1 <-- add updated active value
5 1004 Finance 1 <-- insert new data
This solution demonstrates the core concepts of a Slowly Changing Dimension (SCD) Type 2 implementation. While it covers the major steps involved, real-world production environments often have more complex requirements. When designing dimension tables (e.g., the dep table), I strongly recommend adding more descriptive columns to enhance clarity. Specifically, including [Start_active_date] and [End_active_date] columns significantly improves the traceability and understanding of dimension changes over time.
Implementing SCD Type 2
Step 1: Create a Dimension Table- dep
# Create table
create table dep (
surrokey int IDENTITY(1, 1),
depID int,
dep varchar(50),
IsActivity bit);
# Insert data,
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 1
Step 2: Create Data Flow
Add the source dataset. dataset should point to file which is located in your source layer.
We have 2 data rows. That means depID =1003, updated value, a new comes depID=1004 need add into dimension table.
Step 3: Add derived column
Add derived column resource and add column name as isactive and provide the value as 1.
Step 4: Sink dimension data
Create a dataset point to SQL Server Database Table dep
Add a Sink use above dataset, SQLServer_dep_table
Configure the sink mappings as shown below
Step 5: Add SQL dataset as another source.
Step 6: Rename column from Database Table dep
Use select resource to rename columns from SQL table.
rename column name:
depID –> sql_depID
dep –> sql_dep
Isactivity –> sql_IsActivity
Step 7: Lookup
Add lookup to join new dimension data that we have import in “srcDep” at “Step 2”
At this step, existing dimension table “Left Join” out the new coming dimension (need update info or new comes dimension values).
existing dimension data, depID=1003 ,previously “dep” called “Sales” , now it need changing to “wholesales”
Step 8: filter out non-nulls
Add filter, filter out the rows which has non-nulls in the source file columns.
Filter expression : depID column is not null.
!isNull(depid)
This requires filtering the ‘lkpNeedUpdate’ lookup output to include only rows where the depID is not null.
Step 9: Select need columns
Since up stream “filterNonNull” output more columns,
Not all columns are required. The objective is to use the new data (containing depid and dep) to update existing information in the dimension table (specifically sql_depID, sql_dep, and sql_isActivity) and mark the old information as inactive.
Add a “SELECT” to select need the columns that we are going to insert or update in Database dimension table.
Step 10: add a new column and give its value = “0”
Add a deriver, set its value is “0” , means mark it as “inactive“
Step 11: alter row
Add a “Alter Row” to update row information.
configure alter conditions:
Update 1==1
Step 12 Sink updated information
we have updated the existing rows, mark it “0” as “inactive”. it time to save it into database dimension table.
Add a “Sink” point to database dimension table – dep
mapping the columns,
sql_depid ---> depID
sql_dep ---> dep
ActivityStatus ---> IsActivity
Step 13: Adjust Sink order
As there are two sinks, one designated for the source data and the other for the updated data, a specific processing order must be enforced.
Click on a blank area of the canvas, at “Settings” tag, configure them order. 1: sinkUpdated 2: sinkToSQLDBdepTable
Step 14: creata a pipeline
create a pipeline, add this data flow, run it.
SELECT TOP (5000) [surrokey]
,[depID]
,[dep]
,[IsActivity]
FROM [williamSQLDB].[dbo].[dep]
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 0
4 1003 Wholesale 15 1004 Finance 1
Conclusion
In conclusion, we have explored the powerful combination of Slowly Changing Dimensions Type 2, it has provided you with a comprehensive understanding of how to effectively implement SCD Type 2 in your data warehousing projects, leveraging modern technologies and following industry best practices.
By implementing SCD Type 2 according to Ralph Kimball’s approach, organizations can achieve a comprehensive view of dimensional data, enabling accurate trend analysis, comparison of historical performance, and tracking of changes over time. It empowers businesses to make data-driven decisions based on a complete understanding of the data’s evolution, ensuring data integrity and reliability within the data warehousing environment.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
This article is part of a series dedicated to dynamic ETL Source-to-Target Mapping (STM) solutions, covering both batch and near real-time use cases. The series will explore various mapping scenarios, including one-to-many, many-to-one, and many-to-many relationships, with implementations provided throughout.
You need create and alter metadata table privilege.
Scenario
In this article, I will focus on scenario where the source schema may have new or missing columns, or the destination schema may have columns with different names or might lack columns to accommodate new incoming source fields.
Requirement:
Dynamically handle source variations, map data to the consistent destination schema, and handle missing columns gracefully. Giving default value to missed column, add new column to target DB table if they are new coming.
Source:
CSV, Schema varies between executions (columns may be missing, reordered, or new). current source columns’ name: name, age, gender and state
Field’s name in the file, or column’s name in t DB table.
“type”
Logical Data Type. The abstract or generalized type used by Azure Data Factory (ADF) to interpret data regardless of the underlying system or format. For example, string, integer, double etc.
“physicalType”
The specific type defined by the database or file system where the data resides. For example, VARCHAR, NVARCHAR, CHAR, INT, FLOAT, NUMERIC(18,10), TEXT etc. in database
Each column has this source-to-sink mapping plan, we will concat all column’s mapping plan, generate a complete Source to Target mapping (STM) plan.
Step 2: Creating known field-column mapping plan
For each known field or column, create a Source-to-Target mapping plan, save it in the “mapping” column of the database metadata table, formatted in JSON style string.
Step 4: Reset the activity status of all source fields in the metadata table to False
Save source data name
Since we will address the item’s metadata one field by one field later, saving source data name in variable is convenient.
add a “Set variable” to save source data name in variable – “var_sourcename”
Reset all source fields to False
Add a “lookup activity”, reset the activity status of all source fields in the metadata table to False.
lookup query:
UPDATE metadata SET
src_col_activity = 0
WHERE source_filename = '@{variables('var_sourcename')}';
SELECT 1;
This is one of the important steps. It allows us to focus on the incoming source fields. When we build the complete ETL Source-to-Target mapping plan, we will utilize these incoming fields.
Step 5: ForEach address source data fields
Add the ‘ForEach activity’ to the pipeline, using the ‘structure’ to address the source data fields one by one.
Save source data field name and data type
In the ForEach activity, add two “Set variable” to save source data field name and data type in variable . ForEach’s @item().name —> var_field_name ForEach’s @item().type —> var_field_type
Lookup source fields in metadata table
Continue in ForEach activity, add a “lookup activity”, create a dataset point to metadata table.
Lookup query:
IF NOT EXISTS ( SELECT src_col from metadata WHERE source_filename = '@{variables('var_sourcename')}' AND src_col = '@{variables('var_field_name')}' ) BEGIN -- Alter target table schema IF NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'emp' AND COLUMN_NAME = '@{variables('var_field_name')}' ) ALTER TABLE emp ADD @{item().name} NVARCHAR(max); SELECT 'target altered'; -- return
-- insert field metadata and STM plan INSERT INTO metadata (source_filename , src_col , src_dataType , src_col_activity , destination_schema , destination_table , dst_col , dst_dataType , dst_col_activity , mapping) VALUES ( '@{variables('var_sourcename')}' , '@{variables('var_field_name')}' , '@{variables('var_field_type')}' , 1 , 'dbo' , 'emp' , '@{variables('var_field_name')}' , 'NVARCHAR' , 1 , '{ "source": { "name": "@{variables('var_field_name')}", "type": "@{variables('var_field_type')}", "physicalType":"@{variables('var_field_type')}" }, "sink": { "name": "@{variables('var_field_name')}", "type": "nvarchar(max)", "physicalType":"nvarchar(max)" } }' ); SELECT 'insert field metadata';-- return END ELSE BEGIN UPDATE metadata SET src_col_activity = 1 WHERE source_filename = '@{variables('var_sourcename')}' AND src_col = '@{variables('var_field_name')}' select 'this field actived'; -- return END;
Check if the current source field exists in the ‘metadata’ table. If the field’s name is found, update its activity status to True as an existing field. If the field’s name is not present, it indicates a new field. Insert this new field into the metadata table and establish its mapping plan to specify its intended destination.
Check the target table [emp] to verify if the column exists. If the column is not present, alter the schema of the target table [emp] to add a new column to the destination table.
the target table schema altered
new field, “state”, metadata inserted in to the metadata table
Having established the dynamic mapping plan, we are now prepared to ingest data from the source and deliver it to the target. All preceding steps were dedicated to the development of the ETL mapping plan.
Copy activity: Applying the STM mapping plan
Add a “Copy activity”, using Source and Sink dataset we built previous.
changing to “Mapping” tag, click “Add dynamic content”, write expression:
@json(variables('var_mapping_plan'))
All previous steps were dedicated to building the ETL mapping plan.
Done !!!
Afterword
This article focuses on explaining the underlying logic of dynamic source-to-target mapping through a step-by-step demonstration. To clearly illustrate the workflow and logic flow, four “Set Variable” activities and four pipeline variables are included. However, in a production environment, these are not required.
Having demonstrated dynamic source-to-target mapping with all necessary logic flow steps, this solution provides a foundation that can be extended to other scenarios, such as one-to-many, many-to-one, and many-to-many mappings. Implementations for these scenarios will be provided later.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca (remove all space from the email account 😊)
The Get Metadata activity in Azure Data Factory (ADF) is used to retrieve metadata about a file, folder, or database. This activity is particularly useful when you need to dynamically determine properties like file name, size, structure, or existence and use them in subsequent pipeline activities.
We can specify the following metadata types in the Get Metadata activity field list to retrieve the corresponding information:
Metadata type
Description
itemName
Name of the file or folder.
itemType
Type of the file or folder. Returned value is File or Folder.
size
Size of the file, in bytes. Applicable only to files.
created
Created datetime of the file or folder.
lastModified
Last modified datetime of the file or folder.
childItems
List of subfolders and files in the given folder. Applicable only to folders. Returned value is a list of the name and type of each child item.
contentMD5
MD5 of the file. Applicable only to files.
structure
Data structure of the file or relational database table. Returned value is a list of column names and column types.
columnCount
Number of columns in the file or relational table.
exists
Whether a file, folder, or table exists. If exists is specified in the Get Metadata field list, the activity won’t fail even if the file, folder, or table doesn’t exist. Instead, exists: false is returned in the output.
Metadata structure and columnCount are not supported when getting metadata from Binary, JSON, or XML files.
Wildcard filter on folders/files is not supported for Get Metadata activity.
Get Metadata activity on the canvas if it is not already selected, and its Settings tab, to edit its details.
Sample setting and output
Get a folder’s metadata
Setting
select a dataset or create a new
for folder’s metadata, in the Field list of setting, all we can select are:
The Get Metadata activity in Azure Data Factory (ADF) is a versatile tool for building dynamic, efficient, and robust pipelines. It plays a critical role in handling real-time scenarios by providing essential information about data sources, enabling smarter workflows.
Use Case Scenarios Recap
File Verification: Check if a file exists or meets specific conditions (e.g., size or modification date) before processing.
Iterative Processing: Use folder metadata to dynamically loop through files using the ForEach activity.
Schema Validation: Fetch table or file schema for use in dynamic transformations or validations.
Dynamic Path Handling: Adjust source/destination paths based on retrieved metadata properties.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca (remove all space from the email account 😊)
PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive
By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.
PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.
at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv
we need take additional step – “Rename the File to name1.csv“
# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
# Filter for the part file
for file in files:
if file.name.startswith("part-"):
source_file = file.path # Full path to the part file
destination_file = "dbfs:/FileStore/name1.csv"
# Move and rename the file
dbutils.fs.mv(source_file, destination_file)
break
display(dbutils.fs.ls ( "dbfs:/FileStore/"))
Direct Write with Custom File Name
PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:
Steps:
Use coalesce(1) to combine all data into a single partition.
Save the file to a temporary location.
Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("dbfs:/FileStore/temp_folder")
# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
if file.name.startswith("part-"):
part_file = file.path
break
# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| James | | Smith|36636| M| 3000|
| Michael | Rose| |40288| M| 4000|
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
read a parquet is the same as reading from csv, nothing special.
write to a parquet
append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.
overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.
ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.
pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).
Reading from Multiline JSON (JSON Array) File
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
df_jsonarray_simple = spark.read\
.format("json")\
.option("multiline", "true")\
.load("dbfs:/FileStore/jsonArrary.json")
df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
| City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR| 2| PR| STANDARD| 704|
| BDA SAN LUIS| 10| PR| STANDARD| 709|
+-------------------+------------+-----+-----------+-------+
read complex json
df_complexjson=spark.read\
.option("multiline","true")\
.json("dbfs:/FileStore/jsonArrary2.json")
df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
id | 0001
type | donut
name | Cake
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}]
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
id | 0002
type | donut
name | Raised
ppu | 0.55
batters | {[{1001, Regular}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
id | 0003
type | donut
name | Old Fashioned
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}]}
topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()
Write to JSON
Options
path: Specifies the path where the JSON files will be saved.
mode: Specifies the behavior when writing to an existing directory.
compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”). df2.write . option(“compression”, “gzip”)
dateFormat: Specifies the format for date and timestamp columns. df.write . option(“dateFormat”, “yyyy-MM-dd”)
timestampFormat: Specifies the format for timestamp columns. df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline) df . write . option(“lineSep”, “\r\n”)
encoding: Specifies the character encoding to use when writing the JSON files. UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings. df . write . option(“encoding”, “UTF-8”)
Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.
Append: mode("append") to append the rows to the existing SQL Server table.
# we have define variables, here is show again server_name = “mainri-sqldb.database.windows.net” port=1433 username=”my login name” password=”my login password” database_name=”mainri-sqldb” table_name=”dep” jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”
The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.
PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing.
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
# Select columns with where clause
df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("query","select id,age from employee where gender='M'") \
.option("numPartitions",5) \
.option("user", "root") \
.option("password", "root") \
.load()
Using fetchsize with numPartitions to Read
The fetchsizeis another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers. Do not set this to very large number as you might see issues.
# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")
# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")
# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")
# Insert into sampleTable using the sampleView.
spark.sql("INSERT INTO TABLE ct.sampleTable SELECT * FROM sampleView")
# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca