Understanding how failures in individual activities affect the pipeline as a whole is crucial for building robust data workflows.
Some people have used SSIS previously, when they switch from SSIS to the Azure Data Factory and Synapse, they might confuse in ADF or ASA ‘s “pipeline logical failure mechanisam” ADF or ASA’s pipeline orchestration allows conditional logic and enables the user to take a different path based upon outcomes of a previous activity. Using different paths allows users to build robust pipelines and incorporates error handling in ETL/ELT logic.
ADF or ASA activity outcomes path
ADF or ASA has 4 paths in total.
A pipeline can have multiple activities that can be executed in sequence or in parallel.
Sequential Execution: Activities are executed one after another.
Parallel Execution: Multiple activities run simultaneously.
You are able to add multiple branches following an activity, for each pipeline run, at most one path is activated, based on the execution outcome of the activity.
Error Handling Mechanism
When an activity fails within a pipeline, several mechanisms can be employed to handle the failure:
In most cases, pipelines are orchestrated in Parallel, Serial or Mixed model. The key point is understanding what will happen in Parallet or Serial model.
From upon activity point of view, the basic principles that are:
Multiple dependencies with the same source are logical “OR”
Multiple dependencies with different sources are logical “AND”
Different error handling mechanisms lead to different status for the pipeline: while some pipelines fail, others succeed. We determine pipeline success and failures as follows:
Evaluate outcome for all leaves activities. If a leaf activity was skipped, we evaluate its parent activity instead.
Pipeline result is success if and only if all nodes evaluated succeed
Let us discuss in detail.
Multiple dependencies with the same source
This seems like “Serial” or “sequence”
How “Serial” pipeline failure is determined
As we develop more complicated and resilient pipelines, it’s sometimes required to introduce conditional executions to our logic: execute a certain activity only if certain conditions are met. At this point, as long as one or more activities failed while one or other activities success in a pipeline, what is the status of the entire pipeline? Success? Failure? How are pipeline failure determined?
In fact, ADF/ASA has unique insight. Software engineers are used to customary form:
“if … then … else …”; try … catch …”, let’s use the developer’ idiom
Single upon activity or Serial model, multiple downstream
upon act failed >> downstream act success >> pipeline success
Scenario 2:
If … then … Else …
Pipeline defines both the Upon Failure and Upon Success paths. This approach renders pipeline fails, even if Upon Failure path succeeds.
Both success & failure path
upon act failed >> downstream act failed >> pipeline success
Both success & failure path
upon act failed >> downstream failed >> pipeline failed
Scenario 3
If …Skip… Else ….
Both success & failure path, and skip path
upon act success >> downstream act success >> skip path is skipped >> pipeline success
Multiple dependencies with different sources
This seems like “Parallel”, its logical is “And”
Scenario 4:
Upon act 1 success and upon act 2 success >> downstream act success >> pipeline success.
Upon act 1 success and upon act 2 failed >> downstream act success >> pipeline success.
pay attention to the “Set variable failed” uses “fail” path.
That mean:
“set variable success” the action is true
Although “set variable failed” activity failed, but “set variable failed” the action is true.
so both “set variable success” and “set variable failed” the two action true.
pipeline shows to “success”
Now, let’s try this:
the “Set variable failed” uses “success” path, to see what pipeline shows, pipeline failed.
Why? since the “Set variable failed” action is not true. even if the “set variable success” action is True. True + False = False. follow activity – “set variable act” is skipped. will not execute, will not run! pipeline failed!
All right, you might immediately realize that once we let the “Set variable failed” path uses “complete”, that means no matter it true or false, the downstream activity “set variable act” will not be skipped. Pipeline will show success.
Error Handling
Sample error handling patterns
The pattern is equivalent to try catch block in coding. An activity might fail in a pipeline. When it fails, customer needs to run an error handling job to deal with it. However, the single activity failure shouldn’t block next activities in the pipeline. For instance, I attempt to run a copy job, moving files into storage. However it might fail half way through. And in that case, I want to delete the partially copied, unreliable files from the storage account (my error handling step). But I’m OK to proceed with other activities afterwards.
To set up the pattern:
Add first activity
Add error handling to the UponFailure path
Add second activity, but don’t connect to the first activity
Connect both UponFailure and UponSkip paths from the error handling activity to the second activity
Error Handling job runs only when First Activity fails. Next Activity will run regardless if First Activity succeeds or not.
Generic error handling
We have multiple activities running sequentially in the pipeline. If any fails, I need to run an error handling job to clear the state, and/or log the error.
For instance, I have sequential copy activities in the pipeline. If any of these fails, I need to run a script job to log the pipeline failure.
To set up the pattern:
Build sequential data processing pipeline
Add generic error handling step to the end of the pipeline
Connect both Upon Failure and Upon Skip paths from the last activity to the error handling activity
The last step, Generic Error Handling, will only run if any of the previous activities fails. It will not run if they all succeed.
You can add multiple activities for error handling.
Summary
Handling activity failures effectively is crucial for building robust pipelines in Azure Data Factory. By employing retry policies, conditional paths, and other error-handling strategies, you can ensure that your data workflows are resilient and capable of recovering from failures, minimizing the impact on your overall data processing operations.
if you have any questions, please do not hesitate to contact me at william. chen @mainri.ca (remove all space from the email account 😊)
A Service Principal in Azure is an identity used by applications, services, or automated tools to access specific Azure resources. It’s tied to an Azure App Registration and is used for managing permissions and authentication.
The Microsoft identity platform performs identity and access management (IAM) only for registered applications. Whether it’s a client application like a ADF or Synapse, Wen Application or mobile app, or it’s a web API that backs a client app, registering establishes a trust relationship between your application and the identity provider, the Microsoft identity platform.
This article is talking on registering an application in the Microsoft Entra admin center. I outline the registration procedure step by step.
Summary steps:
Navigate to Azure Entra ID (Azure Active Directory)
Create an App Registration
Generate Client Secret, note down Important the Application (client) ID and Directory (tenant) ID, Client-Secret-value.
Using the Service Principle – Assign Roles to the Service Principal Navigate to the Azure resource (e.g., Storage Account, Key Vault, SQL Database) you want your Service Principal to access.
Step by Step Demo
Register a new Application on Azure Entra ID (formerly called Azure Active Directory), get an Application ID and Client Secret value.
Azure Portal >> Azure Entra ID (formerly called Azure Active Directory)
(1) Copy Tenant ID.
We need this Tenant ID later.
(2) App Registration
(3) Copy Application ID. We will use it later
(4) Create Client Secret
Generate a new client Secret,
(5) copy the Client Secret Value
Copy client-secret-value, we need it later.
Cause: the Client Secret Value you HAVE TO COPY IT RIGHT NOW! IMMEDIATELY copy NOW. And put it to a secure place. Since the Value WILL NOT reappear anymore. IMOPRTANT!
(6) Using the Service Principle – Assign Roles to the Service Principal
Assign Roles to the Service Principal
Now, assign permissions to your Service Principal so it can access specific Azure resources:
Navigate to the Azure resource (e.g., Storage Account, Key Vault, SQL Database) you want your Service Principal to access.
Go to Access Control (IAM).
Click Add and choose Add role assignment.
Choose a role (e.g., Contributor, Reader, or a custom role).
Search for your App Registration by its name and select it.
Save
We have finished all at Azure Entra ID (Former Azure Active Directory)
Please do not hesitate to contact me if you have any questions at william . chen @mainri.ca
In this article I will provide a fully Metadata driven solution about using Azure Data Factory (ADF) or Synapse Analytics (ASA) incrementally copy multiple data sets one time from SharePoint Online (SPO) then sink them to ADSL Gen2.
Previously, I have published articles regarding ADF or ASA working with SPO. if you are interested in specifics matters, please look at related articles from here , or email me at william . chen @ mainri.ca (please remove spaces from email account 🙂 ).
Scenario and Requirements
Metadata driven. All metadata are save on SharePoint list, such as TenantID, ClientID, SPO site name, ADLS account, inspecting offset days for increment loading …. etc.
Initial full load, then monitor data set status, once it update, incrementally load, for example, on daily basis.
Solution
Retrieves metadata >> retrieves “client Secret” >> request access token >> insect and generate interests data list >> iteratively copy interest data sink to destination storage.
Prerequisite
Register SharePoint (SPO) application in Azure Active Directory (AAD).
Grant SPO site permission to registered application in AAD.
Provision Resource Group, ADF (ASA) and ADLS in your Azure Subscription.
I have other articles to talk those in detail. If you need review, please go to
Let us begin , Step by Step in detail, I will mention key points for every steps.
Step1:
Using Lookup activity to retrieve all metadata from SharePoint Online
Firstly, create a SharePoint on Line List type Linked Service – SPO_ITDataGovernanceWorkgroup you need provide:
SharePoint site URL, you should replace it by using yours. it looks like htts://[your_domain].sharepoint.sites/[your_site_name]. mine is https://mainri.sharepoint.com/sites/ITDataGovernanceWorkgroup.
Tenant ID. The tenant ID under which your application resides. You can find it from Azure portal Microsoft Entra ID (formerly called Azure AD) overview page.
Service principal ID (Client ID) The application (client) ID of your application registered in Microsoft Entra ID. Make sure to grant SharePoint site permission to this application.
Service principal Key The client secret of your application registered in Microsoft Entra ID. mine is save in Azure Key Vault
secondly, using above Linked Service create a SharePoint type Dataset – DS_SPO_ITDataGovernanceWorkgroup parametrize the dataset.
I name the parameter “List“, This parameter lets lookup activity knows where your metadata resides. (mine is called SourceToLanding)
Now, we are ready to configure the Lookup activity
Source dataset: use above created dataset – DS_SPO_ITDataGovernanceWorkgroup
Query: @concat(‘$filter=SystemName eq ”’,pipeline().parameters.System,”’ and StatusValue eq ”Active”’)
This query filters out my interest SPO list where my metadata saves. My metadata list in SPO looks like this
This lookup activity return metadata retrieved from SPO list. Looks like this.
response is json format. My “SourceParameterJSON” value is string, but it well match json format, could be covert to json.
Step 2:
Get Client Secret
To get SPO access token, we need: Tenant ID, Client ID, and Client Secret
Tenant ID: You can get this from Azure Entra ID (former Azure active Directory)
Client ID: When you register your application at Azure Entra ID, azure Entra ID will generate one, called Application ID (Client ID). You can get this from Azure Portal >>Entra ID
Client Secret: After you registered your application at Azure Entra ID, you build a certificate secret from Azure Entra ID, and you immediately copied and kept that value. The value will not reappear after that process anymore.
As I mentioned above, my client Secret is saved in azure Key-vault. To get Client Secret, use Web Activity to retrieve Client Secret (if you have the client Secret on hand, you can skip this activity, directly move to next activity Get SPO Token)
Method: GET
Authentication: System Assigned Managed Identity
Resource: https://vault.azure.net
URL:
@{concat( concat( json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_KeyVault_URL , ‘/secrets/’ ) , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_keyVault_SecretsName , ‘?api-version=7.0′ )}
Attention: from above URL content, you can see. this SourceParameterJSON section matches well with json format. BUT it is NOT json, it is a string, so I convert the string to json. ?api-version=7.0 is another key point. You must add to your URL
Create a Web Activity to get the access token from SharePoint Online:
URL: https://accounts.accesscontrol.windows.net/[Tenant-ID]/tokens/OAuth/2. Replace the tenant ID.
mine looks : @{concat(‘https://accounts.accesscontrol.windows.net/’ ,json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id ,’/tokens/OAuth/2′ )}
Replace the client ID (application ID), client secret (application key), tenant ID, and tenant name (of the SharePoint tenant).
mine looks: @{concat(‘grant_type=client_credentials&client_id=’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Client_id , ‘@’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id ,’&client_secret=’ , activity(‘Get_Client_Secret’).output.value , ‘&resource=00000003-0000-0ff1-ce00-000000000000/infrastructureontario.sharepoint.com@’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id )}
Attention: As mentioned at Step 2 , pay attention to json convert for section “SourceParameterJSON”
Called: “ls_SPO_DnA_http_baseUrl”, Parameteriz the Linked Service
Provide:
SharePoint site URL Pattern: https://<your domain>.sharepoint.com/sites/<SPO site Name> Replace <your domain> and <SPO site Name> Mine looks: https://mainri.sharepoint.com/sites/dataengineering
Tenant ID
Service principal (or says clientID, applicationID)
2. Create a SharePoint List type Dataset, Called “ds_DnA_spo_sources_array” and parameteriz the dataset.
Linked service: ls_SPO_DnA_http_baseUrl (you just created)
Parameter:
ds_DnA_spoList_Name
ds_par_spo_site
The dataset looks
3. configure the Lookup Activity
Use up steam activities return results to configure lookup activity.
source dataset: ds_DnA_spo_sources_array
Dynamic content: ds_DnA_spoList_Name: @json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_DnA_spoList_name
ds_par_spo_site: @json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_SPO_site
Query: I incrementally ingest data, so I use query to filter out interest items only.
@concat(‘$filter=ContentType eq ‘,”’Document”’ , ‘ and ‘ ,’Modified ge datetime”’ ,formatDateTime(addDays(utcNow() ,json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_Inspecting_Offset_Day),’yyyy-MM-dd’), ””)
The value of ‘GetFileByServerRelativeUrl’ is our key point of work. It points to the specific URL location of the dataset. In fact, all upstream work efforts are aimed at generating specific content for this link item!
Cheers, we are almost there!
2. Create a Http Binary source dataset , called “ds_spo_src”, Parameterize it
Parameter: RelativeURL ls_par_SPO_site
Linked Service: ls_SPO_HttpServer , we just created.
3. Configure copy activity’s “Source”
Request method: GET
Source dataset: ds_spo_src
RelativeURL:
@{ concat( item().Path ,’/’ ,item().Name ) }
ls_par_SPO_site:
@json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_SPO_site
This article is focused on ADF or ASA lookup activity filter modified date, type, is Current version or not etc. query for SharePoint Online List.
Scenario:
Many organizations like to save data on SharePoint Online site, especially metadata. To incrementally extract the latest or certain date ranges modified data from SharePoint Online (SPO) we need to filter the modified date and inspect whether it is the latest version or not.
For example, there are items (documents, folders, ……) reside on SharePoint Online, items property looks like:
We want to know whether they are modified after a certain date, the latest version?, is it a document or folder etc. we need to check when we retrieve it from SharePoint Online we will get json response.
Let’s begin.
Solution:
In this article, we focus on the Lookup Activity only, especially on lookup query content. Not only I will ignore lookup’s other configurations, but also skip other activities steps from the pipeline. Such as how to access SPO, how to extract data SPO how to sink to destination ….
If you are interested in those and want to know more in detail, please review my previous articles:
2) Check items on SPO modified “DATE” and type is “document”
Copy Activity: Lookup_DnA_spo_Sources_array
This lookup activity filter items that save in SharePoint Library:
ContentTyep = Document;
FIle Saving Path = /sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 – Raw Data that means, I look up the files save at this path only.
file’s Modified >= pre-set offset day
@concat('$filter=ContentType eq ','''Document''', ' and Path eq ','''/sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 - Raw Data''', ' and ','Modified ge datetime''',formatDateTime(addDays(utcNow(),json(activity('lkp metadata of Source to Landing from SPO').output.value[0].SourceParameterJSON).pl_Inspecting_Offset_Day),'yyyy-MM-dd'),'''')
Here, I use “offset” conception, it is a poperty I save on SPO list. Of course, you can provide this offset value in many ways, such as pipeline parameter, save in SQL table, save in a file ….etc. wherever you like.
For example, you incrementally ingest data on daily basis,
the offset = -1 weekly basis, offset = -7 Ten days, customized period, offset = -10 ……… etc.
one more example. if you want to check items saved in SPO “isCurrentVersion” or not and type is “document”