Often, when you define your pipeline data flow, you face a situation where some activities should be executed under specific conditions, e.g. depending on the previous activity result or input parameter value. Another case is that some activities should be repeated many times, but in slightly different contexts, e.g. you want to load data to different locations in Blob. ADF V1 did not support these scenarios. However, Azure Data Factory V2 has finally closed this gap!
Welcome to my third post about Azure Data Factory V2. This time I will focus on control flow activities which let us conditionally define pipeline workflows. Additionally, I will cover parameters and show how to combine them within the pipeline.
Let’s get right to it and see what new activities we have.
The new activities are here!
With Azure Data Factory V2, Microsoft introduced several new activities which I will describe in short.
If Condition evaluates the boolean expression. Depending on the expression result (true or false), the pipeline will invoke an appropriate set of activities. This activity has the same behavior as a typical “if” statement in many programming languages.
For instance, you can provide the following expression as a condition (using pipeline parameters): @bool(pipeline().parameters.paramValue). When you run a pipeline you can define the parameter value. The program will perform the appropriate branch with activities depending on that value.
As with the If Condition activity, the For Each activity concept is similar to that in programming languages. It allows you to iterate over a specific set of items and repeat a set of activities.
Imagine a situation where you have to copy files to multiple locations within Blob storage. To achieve this, just provide a For Each loop, where the parameter is a collection of desired destination folder paths. For Each loop will iterate through each item and will use it as a destination path for copy activity inside the loop.
When you have a lot of items to iterate, you can speed up the execution by setting isSequential property to false. This will change the mode to parallel execution. The current limit is 20 concurrent iterations.
Here, the situation is very similar to For Each and If Condition. We have a loop, but in this case, we iterate till the specified condition is true.
Example usage scenario might be constant web service calls (Web activity) every 10 seconds. In order to wait 10 seconds between each execution you should use Wait activity.
The usage case is very simple. The activity defines in seconds how much time it should wait before continuing pipeline execution of subsequent activities. What you have to do is set the value for waitTimeInSeconds property.
This is another interesting activity. You can use it to read values from an external source as an input for your pipeline. Then you might use the output from Lookup activity in subsequent activities.
An example scenario might be where you use Lookup to take values from Azure SQL Database table as an input collection through which the For Each loop should iterate. Basically, you can use Lookup activity to read configuration for your pipeline.
Currently, the feature supports several sources:
- Files stored on Azure Blob or File System (file must be formatted as JSON)
- Azure SQL Database, Azure SQL Data Warehouse, SQL Server
- Azure Table storage.
Another limitation is the number of rows returned by lookup activity which is limited to 5000 records and max. size is 10 MB. Lookup output is formatted as a JSON file, i.e. a set or an array of objects.
With web activity, you can call any REST API. For more details about Web activity please refer to the official documentation.
This activity allows for collecting metadata about Azure Data Factory. At the time of writing this article, the Get Metadata activity supports only retrieving metadata from Blob datasets. You can retrieve information on dataset size, structure and last modified time. Please refer to official documentation for more details.
This lets you run another pipeline (child) from an existing one (parent). This is especially useful when your pipeline expanded and you have repeatable workflow steps. This is a good scenario to put such a piece of workflow in a separate pipeline and reuse it whenever possible.
Additionally, it is possible to make this “brick” more generic if you define the appropriate parameters. You can pass their values from parent pipeline to child pipeline.
One more thing worth mentioning is the waitOnCompletion property. It defines whether the pipeline should wait for the related pipeline (parent) to finish execution before the run (child).
On Success, Failure and Completion
The above activities are not the only possible ways to control pipeline flow. With Azure Data Factory V2 you can define which activity should be executed next, depending on the execution result of the current one. You can define workflow path by selecting an activity and clicking on the “+” button. Here you have three options:
- Success – go to next activity when the current execution ends with success.
- Failure – go to next activity when the current execution ends with error (failure).
- Completion – go to next activity when the current execution ends, no matter the result. It can be failure or success.
In the below example you can see the possible execution paths, i.e. three lines: green, red and blue. But in the end, there are only two possible execution paths.
- In case of successful Execute Pipeline activity, the Stored Procedure activity and Copy activity will be executed
- In case of failed Execute Pipeline activity, the U-SQL activity and Copy activity will be executed.
As you can see, the program will always execute the Copy activity, no matter what the result of Execute Pipeline activity is.
Great! We’ve gained some knowledge about the new activities and pipeline workflow. Now, let’s discuss parameters.
Parameters allow for making the pipeline more flexible. We define parameters on a pipeline level. To do so, just select the pipeline and switch to Parameters tab. Then click the + New button and provide parameter name, specify the type and provide a default value.
In order to refer to defined pipeline parameter values, use the following expression:
The syntax is quite simple, i.e. @pipeline().parameters.ParameterName. Nevertheless, a more detailed description you will find here.
We know how to define parameters and how to refer to them. We can use them in the following situations:
- to set the property value of activities, i.e. as a condition in If Condition activity, or setting Data Movement Units in Copy activity
- to set the property value of datasets, i.e. file path, file format, stored procedure or table name.
You can assign parameter value to a specific property (of activity or dataset) in two ways.
Assigning parameter value directly from a specific property
For example, you have defined Azure Blob dataset. Go to Connection tab and set the cursor on File Path; Add dynamic content should appear.
Once you click on it, the Insert Dynamic Content panel should appear. At the bottom, you should see the Parameters section with all parameters defined within the pipeline. Double click on a specific parameter in order to set an expression rather than write it by hand.
Here you can also build a more advanced expression using expression language with a set of built-in functions (date, time, strings functions etc.) and system variables.
Use the Parameters tab
The second option is to use the Parameters tab on a specific activity or dataset. Here you have a list of properties for which you can set parameters. As you probably noticed, these properties are available in other tabs (within a specific activity or dataset), but here you can find them all in one place. It is more convenient to set properties from here rather than search for them one by one on separate tabs. Notice that you cannot set every dataset or activity property value dynamically through parameters, e.g. linked service for the dataset.
When all is done, i.e. the parameters are defined and their references are set to appropriate properties, run your pipeline. On the Pipeline Run panel, you will see a list of all defined parameters with default values. Here is the place where you can change them, and test pipeline with the new parameters context.
In this post, I went through several new activities introduced to Azure Data Factory V2. These activities significantly improve the possibilities for building a more advanced pipeline workflow logic. Additionally, it is possible to define a pipeline workflow path based on activity completion result.
In the second part, I described pipeline parameters and I showed how to utilize them to set specific properties within datasets of activities.
We are getting closer to the end. In my last post, I will describe SSIS Integration Runtime and I will tell you a little bit more about triggers. See you next time!