In Azure Data Factory (ADF) or Synapse, using Copy Activity with a File System as a source or sink is common when dealing with on-premises file systems, network file shares, or even cloud-based file storage systems. Here’s an overview of how it works, key considerations, and steps to set it up.
Key Components and setup with File System:
Create a File System Linked Service
Linked Service: For on-premises or network file systems, you typically need a Self-hosted Integration Runtime (SHIR).
Fill in the required fields:
Connection: Specify the file system type (e.g., network share or local path).
Authentication: Provide the appropriate credentials, such as username/password, or key-based authentication.
If the file system is on-premises, configure the Self-hosted Integration Runtime to access it.
Create File System Dataset
Go to Datasets in ADF and create a new dataset. Select File System as the data source.
Configure the dataset to point to the correct file or folder:
Specify the File Path.
Define the file format (e.g., CSV, JSON, XML).
Set any schema information if required (for structured data like CSV).
Considerations:
Integration Runtime: For on-premises file systems, the Self-hosted Integration Runtime (SHIR) is essential to securely move data from private networks.
Performance: Data transfer speeds depend on network configurations (for on-prem) and ADF’s parallelism settings.
File Formats: Ensure proper handling of different file formats (e.g., CSV, JSON, Binary etc.) and schema mapping for structured files.
Security: Ensure credentials and network configurations are correctly set up, and consider encryption if dealing with sensitive data.
Common Errors:
Connection issues: If the SHIR is not correctly configured, or if there are issues with firewall or network settings, ADF may not be able to access the file system.
Permission issues: Ensure that the correct permissions are provided to access the file system (file share, SMB, FTP, etc.).
In Azure Data Factory (ADF), both the Copy Activity using wildcards (*.*) and the Get Metadata activity for retrieving a file list are designed to work with multiple files for copying or moving. However, they operate differently and are suited to different scenarios.
Copy Activity with Wildcard *.*
Purpose: Automatically copies multiple files from a source to a destination using wildcards.
Use Case: Used when you want to move, copy, or process multiple files in bulk that match a specific pattern (e.g., all .csv files or any file in a folder).
Wildcard Support: The wildcard characters (* for any characters, ? for a single character) help in defining a set of files to be copied. For example:
*.csv will copy all .csv files in the specified folder.
file*.json will copy all files starting with file and having a .json extension.
Bulk Copy: Enables copying multiple files without manually specifying each one.
Common Scenarios:
Copy all files from one folder to another, filtering based on extension or name pattern.
Copy files that were uploaded on a specific date, assuming the date is part of the file name.
Automatic File Handling: ADF will automatically copy all files matching the pattern in a single operation.
Key Benefit: Efficient for bulk file transfers with minimal configuration. You don’t need to explicitly get the file list; it uses wildcards to copy all matching files.
Example Scenario:
You want to copy all .csv files from a folder in Blob Storage to a Data Lake without manually listing them.
2. Get Metadata Activity (File List Retrieval)
Purpose: Retrieves a list of files in a folder, which you can then process individually or use for conditional logic.
Use Case: Used when you need to explicitly obtain the list of files in a folder to apply custom logic, processing each file separately (e.g., for-looping over them).
No Wildcard Support: The Get Metadata activity does not use wildcards directly. Instead, it returns all the files (or specific child items) in a folder. If filtering by name or type is required, additional logic is necessary (e.g., using expressions or filters in subsequent activities).
Custom Processing: After retrieving the file list, you can perform additional steps like looping over each file (with the ForEach activity) and copying or transforming them individually.
Common Scenarios:
Retrieve all files in a folder and process each one in a custom way (e.g., run different processing logic depending on the file name or type).
Check for specific files, log them, or conditionally process based on file properties (e.g., last modified time).
Flexible Logic: Since you get a list of files, you can apply advanced logic, conditions, or transformations for each file individually.
Key Benefit: Provides explicit control over how each file is processed, allowing dynamic processing and conditional handling of individual files.
Example Scenario:
You retrieve a list of files in a folder, loop over them, and process only files that were modified today or have a specific file name pattern.
Side-by-Side Comparison
Feature
Copy Activity (Wildcard *.*)
Get Metadata Activity (File List Retrieval)
Purpose
Copies multiple files matching a wildcard pattern.
Retrieves a list of files from a folder for custom processing.
Wildcard Support
Yes (*.*, *.csv, file?.json, etc.).
No, retrieves all items from the folder (no filtering by pattern).
File Selection
Automatically selects files based on the wildcard pattern.
Retrieves the entire list of files, then requires a filter for specific file selection.
Processing Style
Bulk copying based on file patterns.
Custom logic or per-file processing using the ForEach activity.
Use Case
Simple and fast copying of multiple files matching a pattern.
Used when you need more control over each file (e.g., looping, conditional processing).
File Count Handling
Automatically processes all matching files in one step.
Returns a list of all files in the folder, and each file can be processed individually.
Efficiency
Efficient for bulk file transfer, handles all matching files in one operation.
More complex as it requires looping through files for individual actions.
Post-Processing Logic
No looping required; processes files in bulk.
Requires a ForEach activity to iterate over the file list for individual processing.
Common Scenarios
– Copy all files with a .csv extension. – Move files with a specific prefix or suffix.
– Retrieve all files and apply custom logic for each one. – Check file properties (e.g., last modified date).
Control Over Individual Files
Limited, bulk operation for all files matching the pattern.
Full control over each file, allowing dynamic actions (e.g., conditional processing, transformations).
File Properties Access
No access to specific file properties during the copy operation.
Access to file properties like size, last modified date, etc., through metadata retrieval.
Execution Time
Fast for copying large sets of files matching a pattern.
Slower due to the need to process each file individually in a loop.
Use of Additional Activities
Often works independently without the need for further processing steps.
Typically used with ForEach, If Condition, or other control activities for custom logic.
Scenarios to Use
– Copying all files in a folder that match a certain extension (e.g., *.json). – Moving large numbers of files with minimal configuration.
– When you need to check file properties before processing. – For dynamic file processing (e.g., applying transformations based on file name or type).
When to Use Each:
Copy Activity with Wildcard:
Use when you want to copy multiple files in bulk and don’t need to handle each file separately.
Best for fast, simple file transfers based on file patterns.
Get Metadata Activity with File List:
Use when you need explicit control over each file or want to process files individually (e.g., with conditional logic).
Ideal when you need to loop through files, check properties, or conditionally process files.
Understanding how failures in individual activities affect the pipeline as a whole is crucial for building robust data workflows.
Some people have used SSIS previously, when they switch from SSIS to the Azure Data Factory and Synapse, they might confuse in ADF or ASA ‘s “pipeline logical failure mechanisam” ADF or ASA’s pipeline orchestration allows conditional logic and enables the user to take a different path based upon outcomes of a previous activity. Using different paths allows users to build robust pipelines and incorporates error handling in ETL/ELT logic.
ADF or ASA activity outcomes path
ADF or ASA has 4 paths in total.
A pipeline can have multiple activities that can be executed in sequence or in parallel.
Sequential Execution: Activities are executed one after another.
Parallel Execution: Multiple activities run simultaneously.
You are able to add multiple branches following an activity, for each pipeline run, at most one path is activated, based on the execution outcome of the activity.
Error Handling Mechanism
When an activity fails within a pipeline, several mechanisms can be employed to handle the failure:
In most cases, pipelines are orchestrated in Parallel, Serial or Mixed model. The key point is understanding what will happen in Parallet or Serial model.
From upon activity point of view, the basic principles that are:
Multiple dependencies with the same source are logical “OR”
Multiple dependencies with different sources are logical “AND”
Different error handling mechanisms lead to different status for the pipeline: while some pipelines fail, others succeed. We determine pipeline success and failures as follows:
Evaluate outcome for all leaves activities. If a leaf activity was skipped, we evaluate its parent activity instead.
Pipeline result is success if and only if all nodes evaluated succeed
Let us discuss in detail.
Multiple dependencies with the same source
This seems like “Serial” or “sequence”
How “Serial” pipeline failure is determined
As we develop more complicated and resilient pipelines, it’s sometimes required to introduce conditional executions to our logic: execute a certain activity only if certain conditions are met. At this point, as long as one or more activities failed while one or other activities success in a pipeline, what is the status of the entire pipeline? Success? Failure? How are pipeline failure determined?
In fact, ADF/ASA has unique insight. Software engineers are used to customary form:
“if … then … else …”; try … catch …”, let’s use the developer’ idiom
Single upon activity or Serial model, multiple downstream
upon act failed >> downstream act success >> pipeline success
Scenario 2:
If … then … Else …
Pipeline defines both the Upon Failure and Upon Success paths. This approach renders pipeline fails, even if Upon Failure path succeeds.
Both success & failure path
upon act failed >> downstream act failed >> pipeline success
Both success & failure path
upon act failed >> downstream failed >> pipeline failed
Scenario 3
If …Skip… Else ….
Both success & failure path, and skip path
upon act success >> downstream act success >> skip path is skipped >> pipeline success
Multiple dependencies with different sources
This seems like “Parallel”, its logical is “And”
Scenario 4:
Upon act 1 success and upon act 2 success >> downstream act success >> pipeline success.
Upon act 1 success and upon act 2 failed >> downstream act success >> pipeline success.
pay attention to the “Set variable failed” uses “fail” path.
That mean:
“set variable success” the action is true
Although “set variable failed” activity failed, but “set variable failed” the action is true.
so both “set variable success” and “set variable failed” the two action true.
pipeline shows to “success”
Now, let’s try this:
the “Set variable failed” uses “success” path, to see what pipeline shows, pipeline failed.
Why? since the “Set variable failed” action is not true. even if the “set variable success” action is True. True + False = False. follow activity – “set variable act” is skipped. will not execute, will not run! pipeline failed!
All right, you might immediately realize that once we let the “Set variable failed” path uses “complete”, that means no matter it true or false, the downstream activity “set variable act” will not be skipped. Pipeline will show success.
Error Handling
Sample error handling patterns
The pattern is equivalent to try catch block in coding. An activity might fail in a pipeline. When it fails, customer needs to run an error handling job to deal with it. However, the single activity failure shouldn’t block next activities in the pipeline. For instance, I attempt to run a copy job, moving files into storage. However it might fail half way through. And in that case, I want to delete the partially copied, unreliable files from the storage account (my error handling step). But I’m OK to proceed with other activities afterwards.
To set up the pattern:
Add first activity
Add error handling to the UponFailure path
Add second activity, but don’t connect to the first activity
Connect both UponFailure and UponSkip paths from the error handling activity to the second activity
Error Handling job runs only when First Activity fails. Next Activity will run regardless if First Activity succeeds or not.
Generic error handling
We have multiple activities running sequentially in the pipeline. If any fails, I need to run an error handling job to clear the state, and/or log the error.
For instance, I have sequential copy activities in the pipeline. If any of these fails, I need to run a script job to log the pipeline failure.
To set up the pattern:
Build sequential data processing pipeline
Add generic error handling step to the end of the pipeline
Connect both Upon Failure and Upon Skip paths from the last activity to the error handling activity
The last step, Generic Error Handling, will only run if any of the previous activities fails. It will not run if they all succeed.
You can add multiple activities for error handling.
Summary
Handling activity failures effectively is crucial for building robust pipelines in Azure Data Factory. By employing retry policies, conditional paths, and other error-handling strategies, you can ensure that your data workflows are resilient and capable of recovering from failures, minimizing the impact on your overall data processing operations.
if you have any questions, please do not hesitate to contact me at william. chen @mainri.ca (remove all space from the email account 😊)
In this article I will provide a fully Metadata driven solution about using Azure Data Factory (ADF) or Synapse Analytics (ASA) incrementally copy multiple data sets one time from SharePoint Online (SPO) then sink them to ADSL Gen2.
Previously, I have published articles regarding ADF or ASA working with SPO. if you are interested in specifics matters, please look at related articles from here , or email me at william . chen @ mainri.ca (please remove spaces from email account 🙂 ).
Scenario and Requirements
Metadata driven. All metadata are save on SharePoint list, such as TenantID, ClientID, SPO site name, ADLS account, inspecting offset days for increment loading …. etc.
Initial full load, then monitor data set status, once it update, incrementally load, for example, on daily basis.
Solution
Retrieves metadata >> retrieves “client Secret” >> request access token >> insect and generate interests data list >> iteratively copy interest data sink to destination storage.
Prerequisite
Register SharePoint (SPO) application in Azure Active Directory (AAD).
Grant SPO site permission to registered application in AAD.
Provision Resource Group, ADF (ASA) and ADLS in your Azure Subscription.
I have other articles to talk those in detail. If you need review, please go to
Let us begin , Step by Step in detail, I will mention key points for every steps.
Step1:
Using Lookup activity to retrieve all metadata from SharePoint Online
Firstly, create a SharePoint on Line List type Linked Service – SPO_ITDataGovernanceWorkgroup you need provide:
SharePoint site URL, you should replace it by using yours. it looks like htts://[your_domain].sharepoint.sites/[your_site_name]. mine is https://mainri.sharepoint.com/sites/ITDataGovernanceWorkgroup.
Tenant ID. The tenant ID under which your application resides. You can find it from Azure portal Microsoft Entra ID (formerly called Azure AD) overview page.
Service principal ID (Client ID) The application (client) ID of your application registered in Microsoft Entra ID. Make sure to grant SharePoint site permission to this application.
Service principal Key The client secret of your application registered in Microsoft Entra ID. mine is save in Azure Key Vault
secondly, using above Linked Service create a SharePoint type Dataset – DS_SPO_ITDataGovernanceWorkgroup parametrize the dataset.
I name the parameter “List“, This parameter lets lookup activity knows where your metadata resides. (mine is called SourceToLanding)
Now, we are ready to configure the Lookup activity
Source dataset: use above created dataset – DS_SPO_ITDataGovernanceWorkgroup
Query: @concat(‘$filter=SystemName eq ”’,pipeline().parameters.System,”’ and StatusValue eq ”Active”’)
This query filters out my interest SPO list where my metadata saves. My metadata list in SPO looks like this
This lookup activity return metadata retrieved from SPO list. Looks like this.
response is json format. My “SourceParameterJSON” value is string, but it well match json format, could be covert to json.
Step 2:
Get Client Secret
To get SPO access token, we need: Tenant ID, Client ID, and Client Secret
Tenant ID: You can get this from Azure Entra ID (former Azure active Directory)
Client ID: When you register your application at Azure Entra ID, azure Entra ID will generate one, called Application ID (Client ID). You can get this from Azure Portal >>Entra ID
Client Secret: After you registered your application at Azure Entra ID, you build a certificate secret from Azure Entra ID, and you immediately copied and kept that value. The value will not reappear after that process anymore.
As I mentioned above, my client Secret is saved in azure Key-vault. To get Client Secret, use Web Activity to retrieve Client Secret (if you have the client Secret on hand, you can skip this activity, directly move to next activity Get SPO Token)
Method: GET
Authentication: System Assigned Managed Identity
Resource: https://vault.azure.net
URL:
@{concat( concat( json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_KeyVault_URL , ‘/secrets/’ ) , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_keyVault_SecretsName , ‘?api-version=7.0′ )}
Attention: from above URL content, you can see. this SourceParameterJSON section matches well with json format. BUT it is NOT json, it is a string, so I convert the string to json. ?api-version=7.0 is another key point. You must add to your URL
Create a Web Activity to get the access token from SharePoint Online:
URL: https://accounts.accesscontrol.windows.net/[Tenant-ID]/tokens/OAuth/2. Replace the tenant ID.
mine looks : @{concat(‘https://accounts.accesscontrol.windows.net/’ ,json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id ,’/tokens/OAuth/2′ )}
Replace the client ID (application ID), client secret (application key), tenant ID, and tenant name (of the SharePoint tenant).
mine looks: @{concat(‘grant_type=client_credentials&client_id=’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Client_id , ‘@’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id ,’&client_secret=’ , activity(‘Get_Client_Secret’).output.value , ‘&resource=00000003-0000-0ff1-ce00-000000000000/infrastructureontario.sharepoint.com@’ , json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_Tenant_id )}
Attention: As mentioned at Step 2 , pay attention to json convert for section “SourceParameterJSON”
Called: “ls_SPO_DnA_http_baseUrl”, Parameteriz the Linked Service
Provide:
SharePoint site URL Pattern: https://<your domain>.sharepoint.com/sites/<SPO site Name> Replace <your domain> and <SPO site Name> Mine looks: https://mainri.sharepoint.com/sites/dataengineering
Tenant ID
Service principal (or says clientID, applicationID)
2. Create a SharePoint List type Dataset, Called “ds_DnA_spo_sources_array” and parameteriz the dataset.
Linked service: ls_SPO_DnA_http_baseUrl (you just created)
Parameter:
ds_DnA_spoList_Name
ds_par_spo_site
The dataset looks
3. configure the Lookup Activity
Use up steam activities return results to configure lookup activity.
source dataset: ds_DnA_spo_sources_array
Dynamic content: ds_DnA_spoList_Name: @json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_DnA_spoList_name
ds_par_spo_site: @json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_SPO_site
Query: I incrementally ingest data, so I use query to filter out interest items only.
@concat(‘$filter=ContentType eq ‘,”’Document”’ , ‘ and ‘ ,’Modified ge datetime”’ ,formatDateTime(addDays(utcNow() ,json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_Inspecting_Offset_Day),’yyyy-MM-dd’), ””)
The value of ‘GetFileByServerRelativeUrl’ is our key point of work. It points to the specific URL location of the dataset. In fact, all upstream work efforts are aimed at generating specific content for this link item!
Cheers, we are almost there!
2. Create a Http Binary source dataset , called “ds_spo_src”, Parameterize it
Parameter: RelativeURL ls_par_SPO_site
Linked Service: ls_SPO_HttpServer , we just created.
3. Configure copy activity’s “Source”
Request method: GET
Source dataset: ds_spo_src
RelativeURL:
@{ concat( item().Path ,’/’ ,item().Name ) }
ls_par_SPO_site:
@json(activity(‘lkp metadata of Source to Landing from SPO’).output.value[0].SourceParameterJSON).pl_par_SPO_site
This article is focused on ADF or ASA lookup activity filter modified date, type, is Current version or not etc. query for SharePoint Online List.
Scenario:
Many organizations like to save data on SharePoint Online site, especially metadata. To incrementally extract the latest or certain date ranges modified data from SharePoint Online (SPO) we need to filter the modified date and inspect whether it is the latest version or not.
For example, there are items (documents, folders, ……) reside on SharePoint Online, items property looks like:
We want to know whether they are modified after a certain date, the latest version?, is it a document or folder etc. we need to check when we retrieve it from SharePoint Online we will get json response.
Let’s begin.
Solution:
In this article, we focus on the Lookup Activity only, especially on lookup query content. Not only I will ignore lookup’s other configurations, but also skip other activities steps from the pipeline. Such as how to access SPO, how to extract data SPO how to sink to destination ….
If you are interested in those and want to know more in detail, please review my previous articles:
2) Check items on SPO modified “DATE” and type is “document”
Copy Activity: Lookup_DnA_spo_Sources_array
This lookup activity filter items that save in SharePoint Library:
ContentTyep = Document;
FIle Saving Path = /sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 – Raw Data that means, I look up the files save at this path only.
file’s Modified >= pre-set offset day
@concat('$filter=ContentType eq ','''Document''', ' and Path eq ','''/sites/AnalyticsandDataGovernance/Shared Documents/DA27-PanCanada Major Projects Data Automation/04 - Raw Data''', ' and ','Modified ge datetime''',formatDateTime(addDays(utcNow(),json(activity('lkp metadata of Source to Landing from SPO').output.value[0].SourceParameterJSON).pl_Inspecting_Offset_Day),'yyyy-MM-dd'),'''')
Here, I use “offset” conception, it is a poperty I save on SPO list. Of course, you can provide this offset value in many ways, such as pipeline parameter, save in SQL table, save in a file ….etc. wherever you like.
For example, you incrementally ingest data on daily basis,
the offset = -1 weekly basis, offset = -7 Ten days, customized period, offset = -10 ……… etc.
one more example. if you want to check items saved in SPO “isCurrentVersion” or not and type is “document”