Azure Data Factory or Synapse Copy Activity with File System

In Azure Data Factory (ADF) or Synapse, using Copy Activity with a File System as a source or sink is common when dealing with on-premises file systems, network file shares, or even cloud-based file storage systems. Here’s an overview of how it works, key considerations, and steps to set it up.

Key Components and setup with File System:

Create a File System Linked Service

Linked Service: For on-premises or network file systems, you typically need a Self-hosted Integration Runtime (SHIR).

Fill in the required fields:

  • Connection: Specify the file system type (e.g., network share or local path).
  • Authentication: Provide the appropriate credentials, such as username/password, or key-based authentication.
  • If the file system is on-premises, configure the Self-hosted Integration Runtime to access it.

Create File System Dataset

Go to Datasets in ADF and create a new dataset. Select File System as the data source.

Configure the dataset to point to the correct file or folder:

  • Specify the File Path.
  • Define the file format (e.g., CSV, JSON, XML).
  • Set any schema information if required (for structured data like CSV).

Considerations:

  • Integration Runtime: For on-premises file systems, the Self-hosted Integration Runtime (SHIR) is essential to securely move data from private networks.
  • Performance: Data transfer speeds depend on network configurations (for on-prem) and ADF’s parallelism settings.
  • File Formats: Ensure proper handling of different file formats (e.g., CSV, JSON, Binary etc.) and schema mapping for structured files.
  • Security: Ensure credentials and network configurations are correctly set up, and consider encryption if dealing with sensitive data.

Common Errors:

  • Connection issues: If the SHIR is not correctly configured, or if there are issues with firewall or network settings, ADF may not be able to access the file system.
  • Permission issues: Ensure that the correct permissions are provided to access the file system (file share, SMB, FTP, etc.).

ADF activities failure vs pipeline failure and pipeline error handling logical mechanism

Understanding how failures in individual activities affect the pipeline as a whole is crucial for building robust data workflows.

Some people have used SSIS previously, when they switch from SSIS to the Azure Data Factory and Synapse, they might confuse in ADF or ASA ‘s “pipeline logical failure mechanisam” ADF or ASA’s pipeline orchestration allows conditional logic and enables the user to take a different path based upon outcomes of a previous activity. Using different paths allows users to build robust pipelines and incorporates error handling in ETL/ELT logic.

ADF or ASA activity outcomes path

ADF or ASA has 4 paths in total.

A pipeline can have multiple activities that can be executed in sequence or in parallel.

  • Sequential Execution: Activities are executed one after another.
  • Parallel Execution: Multiple activities run simultaneously.

You are able to add multiple branches following an activity, for each pipeline run, at most one path is activated, based on the execution outcome of the activity.

Error Handling Mechanism

When an activity fails within a pipeline, several mechanisms can be employed to handle the failure:

In most cases, pipelines are orchestrated in Parallel, Serial or Mixed model. The key point is understanding what will happen in Parallet or Serial model.

From upon activity point of view, the basic principles that are:

Multiple dependencies with the same source are logical “OR

Multiple dependencies with different sources are logical “AND

Different error handling mechanisms lead to different status for the pipeline: while some pipelines fail, others succeed. We determine pipeline success and failures as follows:

  • Evaluate outcome for all leaves activities. If a leaf activity was skipped, we evaluate its parent activity instead.
  • Pipeline result is success if and only if all nodes evaluated succeed

Let us discuss in detail.

Multiple dependencies with the same source

This seems like “Serial” or “sequence”

How “Serial” pipeline failure is determined

As we develop more complicated and resilient pipelines, it’s sometimes required to introduce conditional executions to our logic: execute a certain activity only if certain conditions are met. At this point, as long as one or more activities failed while one or other activities success in a pipeline, what is the status of the entire pipeline? Success? Failure? How are pipeline failure determined?

In fact, ADF/ASA has unique insight.  Software engineers are used to customary form:  

“if … then … else …”; try … catch …”, let’s use the developer’ idiom

Single upon activity or Serial model, multiple downstreamUpon activityDownstream successful path act1Downstream failure path act2Pipeline Status showscomment
try .. catch …Downstream success path onlySuccessSuccessSuccess
Downstream success path onlyFailedSuccessFailed
Downstream failure path onlyFailedFailedFailed
Downstream failure path onlyFailedSuccessSuccess not really success
If …then ..else …Both success & failure pathSuccessSuccessSuccess
Both success & failure pathFailedSuccessFailed
Both success & failure pathFailedFailedFailed
If .. Skip.. Else  …Both success & failure and skipSuccessSuccessSkipSuccess
Scenario 1: Try … catch …

Downstream success path only:
upon act success >> downstream act success >> pipeline Success

Downstream success path only:
upon act failed >> downstream act success >> pipeline Failed

Downstream failure path only

upon act failed >> downstream act success >> pipeline success

Scenario 2:

If … then … Else

Pipeline defines both the Upon Failure and Upon Success paths. This approach renders pipeline fails, even if Upon Failure path succeeds.

Both success & failure path

upon act failed >> downstream act failed >> pipeline success

Both success & failure path

upon act failed >> downstream failed >> pipeline failed

Scenario 3

If  …Skip… Else   ….

Both success & failure path, and skip path

upon act success >> downstream act success >> skip path is skipped >> pipeline success

Multiple dependencies with different sources

This seems like “Parallel”, its logical is “And”

Scenario 4:

Upon act 1 success and upon act 2 success >> downstream act success >> pipeline success.

Upon act 1 success and upon act 2 failed >> downstream act success >> pipeline success.

pay attention to the “Set variable failed” uses “fail” path.

That mean:

“set variable success” the action is true

Although “set variable failed” activity failed, but “set variable failed” the action is true.

so both “set variable success” and “set variable failed” the two action true.

pipeline shows to “success”

Now, let’s try this:

the “Set variable failed” uses “success” path, to see what pipeline shows, pipeline failed.

Why? since the “Set variable failed” action is not true. even if the “set variable success” action is True. True + False = False. follow activity – “set variable act” is skipped. will not execute, will not run! pipeline failed!

All right, you might immediately realize that once we let the “Set variable failed” path uses “complete”, that means no matter it true or false, the downstream activity “set variable act” will not be skipped. Pipeline will show success.

Error Handling

Sample error handling patterns

The pattern is equivalent to try catch block in coding. An activity might fail in a pipeline. When it fails, customer needs to run an error handling job to deal with it. However, the single activity failure shouldn’t block next activities in the pipeline. For instance, I attempt to run a copy job, moving files into storage. However it might fail half way through. And in that case, I want to delete the partially copied, unreliable files from the storage account (my error handling step). But I’m OK to proceed with other activities afterwards.

To set up the pattern:

  • Add first activity
  • Add error handling to the UponFailure path
  • Add second activity, but don’t connect to the first activity
  • Connect both UponFailure and UponSkip paths from the error handling activity to the second activity

Error Handling job runs only when First Activity fails. Next Activity will run regardless if First Activity succeeds or not.

Generic error handling

We have multiple activities running sequentially in the pipeline. If any fails, I need to run an error handling job to clear the state, and/or log the error.

For instance, I have sequential copy activities in the pipeline. If any of these fails, I need to run a script job to log the pipeline failure.

To set up the pattern:

  • Build sequential data processing pipeline
  • Add generic error handling step to the end of the pipeline
  • Connect both Upon Failure and Upon Skip paths from the last activity to the error handling activity

The last step, Generic Error Handling, will only run if any of the previous activities fails. It will not run if they all succeed.

You can add multiple activities for error handling.

Summary

Handling activity failures effectively is crucial for building robust pipelines in Azure Data Factory. By employing retry policies, conditional paths, and other error-handling strategies, you can ensure that your data workflows are resilient and capable of recovering from failures, minimizing the impact on your overall data processing operations.

if you have any questions, please do not hesitate to contact me at william. chen @mainri.ca (remove all space from the email account 😊)  

Azure Data Factory or Synapse Analytic Lookup Activity Filter Modified date query for SharePoint Online List

This article is focused on ADF or ASA lookup activity filter modified date, type, is Current version or not etc. query for SharePoint Online List.

Scenario:

Many organizations like to save data on SharePoint Online site, especially metadata. To incrementally extract the latest or certain date ranges modified data from SharePoint Online (SPO) we need to filter the modified date and inspect whether it is the latest version or not.

For example, there are items (documents, folders, ……)  reside on SharePoint Online, items property looks like:

{
"count": 110,
"value": [
……
{ "ContentTypeID": "0x010100EE….B186B23",
"Name": "Test Customized reports_SQL Joins.xlsx",
"ComplianceAssetId": null,
"Title": null,
"Description": null,
"ColorTag": null,
"Id": 9,
"ContentType": "Document",
"Created": "2023-04-25T10:53:24Z",
"CreatedById": 61,
"Modified": "2023-08-23T15:13:56Z",
"ModifiedById": 61,
"CopySource": null,
"ApprovalStatus": "0",
"Path": "/sites/mysite/.../Customized Reports SQL joins",
"CheckedOutToId": null,
"VirusStatus": "73382",
"IsCurrentVersion": true,
"Owshiddenversion": 19,
"Version": "9.0"
},
…..

We want to know whether they are modified after a certain date, the latest version?, is it a document or folder etc. we need to check when we retrieve it from SharePoint Online we will get json response.

Let’s begin.

Solution: 

In this article, we focus on the Lookup Activity only, especially on lookup query content. Not only I will ignore lookup’s other configurations, but also skip other activities steps from the pipeline. Such as how to access SPO, how to extract data SPO how to sink to destination ….

If you are interested in those and want to know more in detail, please review my previous articles:

To implement the filter out items properties from SPO’s json response, we need build dynamic content for lookup’s query.

1) Check list status: active or not.

Copy Activity: lkp metadata of Source to Landing from SPO

Get metadata from SPO

@concat( 
'$filter=SystemName eq ''' 
, pipeline().parameters.System 
, ''' and StatusValue eq ''Active''' 

2) Check items on SPO modified “DATE” and type is “document”

Copy Activity: Lookup_DnA_spo_Sources_array

This lookup activity filter items that save in SharePoint Library:

ContentTyep = Document;

FIle Saving Path = /sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 – Raw Data
that means, I look up the files save at this path only.

file’s Modified >= pre-set offset day

@concat(
'$filter=ContentType eq ','''Document'''

, ' and Path eq ','''/sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 - Raw Data'''

, ' and '
,'Modified ge datetime'''
,formatDateTime(addDays(utcNow(),json(activity('lkp metadata of Source to Landing from SPO').output.value[0].SourceParameterJSON).pl_Inspecting_Offset_Day),'yyyy-MM-dd')
,'''')

Here, I use “offset” conception, it is a poperty I save on SPO list. Of course, you can provide this offset value in many ways, such as pipeline parameter, save in SQL table, save in a file ….etc. wherever you like. 

For example, you incrementally ingest data on daily basis,

the offset = -1
weekly basis, offset = -7
Ten days, customized period, offset = -10
………
etc.

one more example.
if you want to check items saved in SPO “isCurrentVersion” or not and type is “document”

That’s all.

if you have any questions please do not hesitate to contact me at william. chen @mainri.ca (remove all space from the email account 😊)