Azure Data Factory (ADF) is an extensively managed, serverless data integration solution designed to handle the ingestion, preparation, and transformation of large volumes of data. Within an Azure subscription, it is possible to possess one or multiple instances, referred to as data factories, of Azure Data Factory. Azure Data Factory encompasses several fundamental elements, there one of is Pipeline. In Azure Data Factory and Azure Synapse, an occurrence of executing a pipeline is referred to as a pipeline run.
Pipeline runs are typically initiated by providing arguments to the parameters defined within the pipeline. You can trigger the execution of a pipeline either manually or by utilizing a trigger mechanism. This article furnishes comprehensive information regarding both methods of executing a pipeline. In this instance, we have a list of pipelines obtained from the Azure API, and each name has the capability to be executed. When the “run” button is clicked, the status undergoes a series of changes. Initially, it displays as “InProgress,” then transitions to “Queued,” and finally shows as “Succeeded.” However, there is also a possibility for it to exhibit the “Failed” status. In this example Web MVC application.
Now, let’s discuss the underlying logic used to initiate the pipeline and update its status. Prior to triggering the pipeline, we retrieve all the pipeline names from the Azure API within the controller.
In this case, all the data will be populated within the Pipeline entity to be displayed in the grid.
[HttpGet]
public IActionResult GetRunnablePipelineList()
{
var result = _context.Pipelines.AsNoTracking();
if (result.Any())
{
return Json(new { data = result });
}
var token = ADFServices.GetToken(_pipelineConfigure.TenantId, _pipelineConfigure.GrantType, _pipelineConfigure.ClientId, _pipelineConfigure.ClientSecret, _pipelineConfigure.Resource);
var param = new PipelineParam
{
FactoryName = _config.FactoryName,
ResourceGroupName = _config.ResourceGroupName,
SubscriptionId = _config.SubscriptionId,
Token = token,
};
var pipelines = ADFServices.GetPipelines(param);
var ret = new List();
foreach (var item in pipelines)
{
Pipeline pipeline = new Pipeline
{
Name = item,
DisplayName = "",
CreatedBy = "admin@gmail.com",
CreatedOn = DateTime.UtcNow,
LastModifiedBy = "",
IsRunnable = false,
LastModifiedOn = null
};
ret.Add(pipeline);
}
_context.Pipelines.AddRange(ret);
return Json(new { data = _context.Pipelines.AsNoTracking() });
}
Following that, when the “Run” button is clicked, the pipeline will be triggered, and a “runId” will be generated. This “runId” is then utilized within the controller to update the status of the pipeline to a new value. In this case, specifically the pipelineId that has been passed as a parameter, to determine the desired pipeline name to execute.
[HttpPost]
public IActionResult RunPipelineStandalone(int pipelineId)
{
var token = ADFServices.GetToken(_pipelineConfigure.TenantId, _pipelineConfigure.GrantType, _pipelineConfigure.ClientId, _pipelineConfigure.ClientSecret, _pipelineConfigure.Resource);
ServiceClientCredentials cred = new TokenCredentials(token);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = _config.SubscriptionId };
var runnable_pipeline = _context.Pipelines.AsNoTracking().Where(x => pipelineId == x.Id).FirstOrDefault();
if (runnable_pipeline != null)
{
var runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(_config.ResourceGroupName, _config.FactoryName, runnable_pipeline.Name).Result.Body;
PipelineRun pipelineRun = client.PipelineRuns.Get(_config.ResourceGroupName, _config.FactoryName, runResponse.RunId);
runnable_pipeline.LastRunId = runResponse.RunId;
runnable_pipeline.LastRunBy = "admin@gmail.com";
runnable_pipeline.LastRunDate = DateTime.UtcNow;
runnable_pipeline.Status = pipelineRun.Status;
_context.Pipelines.Update(runnable_pipeline);
_context.SaveChanges();
}
return Json(new { Success = true });
}
In this above, we have developed the ADFServices class, which serves as a service class encompassing various functions such as GetPipelines and GetToken. Additionally, we have created the Pipeline entity model and PipelineParam models to support the implementation. Now, construct it according to your own preferences or requirements.
We will now create a Demo Web MVC application to demonstrate how to trigger a pipeline.
Configure Demo Web App :
Initially, it is necessary to install a few NuGet packages.
Newtonsoft.Json
RestSharp
Microsoft.Azure.Management.DataFactory
To facilitate better comprehension of the configuration process, we will provide the “Web” <ItemGroup> section within the .csproj file.
Make sure to configure all your pipeline configure credentials in the appsettings.json file. In the appsettings.json file, include the following information:
"PipelineConfigure": {
"TenantId": "...",
"ClientId": "...",
"ClientSecret": "..",
"FactoryName": "...",
"ResourceGroupName": "..",
"SubscriptionId": "..",
"GrantType": "..",
"Resource": "..",
},
To retrieve JSON values from appsettings and appsettings. it is necessary to create a class property. Use the following syntax to define the property:
public class PipelineConfigure{
public string TenantId { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }
public string FactoryName { get; set; }
public string ResourceGroupName { get; set; }
public string SubscriptionId { get; set; }
public string GrantType { get; set; }
public string Resource { get; set; }
}
Next, let’s register the application services. Below is the corresponding configuration:
services.AddSingleton(_configuration.GetSection(nameof(PipelineConfigure)).Get());
To execute an Azure Pipeline using the Azure Data Factory REST API, we need to generate a token. This can be achieved by using an HTTP request in RestSharp. For this purpose, we will utilize a common utility method which execute async way, and use this right way instead of client.ExecuteAsync(request).
public static class RestSharpHelper
{
public static async Task> ExecuteAsyncRequest(this RestClient client, IRestRequest request) where T : class, new()
{
try
{
var taskCompletionSource = new TaskCompletionSource>();
client.ExecuteAsync(request, restResponse => {
if (restResponse.ErrorException != null)
{
const string message = "Error retrieving response.";
throw new ApplicationException(message, restResponse.ErrorException);
}
taskCompletionSource.SetResult(restResponse);
});
return await taskCompletionSource.Task;
}
catch (Exception) { throw; }
}
}
Next, we will generate a service class ADFServices responsible for executing various Azure Data Factory pipeline APIs. Additionally, we will incorporate the PipelineConfigure dependency property into this class by means of dependency injection.
public class ADFServices
{
private readonly PipelineConfigure _pipelineConfigure;
public ADFServices(PipelineConfigure pipelineConfigure)
{
_ pipelineConfigure = pipelineConfigure;
}
}
Next, To make requests to the Azure Data Factory services REST API, it is necessary to generate a separate token specifically for authenticating all Azure Pipeline APIs. Let’s create a model for receiving token response.
public class TokenResponse
{
public string token_type { get; set; }
public int expires_in { get; set; }
public int ext_expires_in { get; set; }
public string access_token { get; set; }
}
Now, let’s proceed by following a few steps:
Token generate:
We utilize the RestSharp RestClient library, which enables us to make calls to the Azure Rest API in order to obtain a token using the pipeline credentials.
var client = new RestClient(url);
var request = new RestRequest(Method.POST);
request.AddHeader("Content-Type", "application/x-www-form-urlencoded");
request.AddParameter("grant_type", _pipelineConfigure.grant_type);
request.AddParameter("client_id", _pipelineConfigure.client_id);
request.AddParameter("client_secret", _pipelineConfigure.client_secret);
request.AddParameter("resource", _pipelineConfigure.resource);
IRestResponse restResponse = RestSharpHelper.ExecuteAsyncRequest
Response look like:
{"token_type":"Bearer","expires_in":"3599","ext_expires_in":"3599","expires_on":"1687030768","not_before":"1687026868","resource":"https://management.azure.com/","access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Ii"}
Retrieve Pipelines:
Once the pipeline token is generated, the next step is to retrieve a list of pipelines. This list will be used to execute each pipeline using the generated token. Let’s create a model for receive response
public class PipelineResponse
{
public List Value { get; set; }
}
public class Value
{
public string Id { get; set; }
public string Name { get; set; }
public string Type { get; set; }
public dynamic Properties { get; set; }
public string Etag { get; set; }
}
Now, let’s make a request to the Azure API using RestSharp to retrieve a list of pipelines
var client = new RestClient(url);
var request = new RestRequest(Method.GET);
request.AddHeader("Content-Type", "application/json");
request.AddHeader("Authorization", "Bearer {your_generated_token}");
IRestResponse restResponse = RestSharpHelper.ExecuteAsyncRequest
Response look like:
Here is “name”: “Get Contacts” is the pipeline name.
{
"value": [
{
"id": "/subscriptions/",
"name": "Get Contacts",
"type": "Microsoft.DataFactory/factories/pipelines",
"properties": {
"activities": [],
"policy": {},
"folder": {},
"annotations": [],
"lastPublishTime": "2022-04-02T22:52:26Z"
},
"etag": "a200e0d6-0000-0d00-0000-6248d3aa0000"
}
]
}
Run Pipeline :
To trigger each pipeline, we need to obtain the runId first. To achieve this, we make a request using the following syntax: Let’s create a model for receive response.
ServiceClientCredentials cred = new TokenCredentials(your_token);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = your_SubscriptionId };
var runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(_config.ResourceGroupName, _config.FactoryName, runnable_pipeline.Name).Result.Body;
Response look like:
{
"runId": "d9017220caf7488cb50d803667cd1143"
}
Get Pipeline Status :
Subsequently, we trigger the pipeline and retrieve the updated status of the pipeline run. This can be done by making a request using the following syntax:
ServiceClientCredentials cred = new TokenCredentials(your_token);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = your_SubscriptionId };
PipelineRun pipelineRun = client.PipelineRuns.Get(your_ResourceGroupName, your_FactoryName, your_rundId);
Response look like:
{
"runId": "4e8cbf7e-3e8a-4e61-8fcf-ad6a7cb2fda9",
"runGroupId": "4e8cbf7e-3e8a-4e61-8fcf-ad6a7cb2fda9",
"isLatest": true,
"pipelineName": "Excel from SharePoint to ADLS",
"parameters": {},
"runDimensions": null,
"invokedBy": {
"name": "Manual",
"id": "5aeb4ff06c584153800a802d772696b5",
"invokedByType": "Manual",
"pipelineName": null,
"pipelineRunId": null
},
"lastUpdated": "2023-07-07T21:00:01.1459149Z",
"runStart": "2023-07-07T21:00:00.2289229Z",
"runEnd": null,
"durationInMs": null,
"status": "InProgress",
"message": "",
"id": "/SUBSCRIPTIONS/315C229B-CB45-46DD-872B-9E54492EBB24/RESOURCEGROUPS/RG-NS-DF-TEST/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/DF-NS-TEST/pipelineruns/4e8cbf7e-3e8a-4e61-8fcf-ad6a7cb2fda9",
"debugRunId": null,
"pipelineReturnValue": null,
"annotations": [],
"runDimension": {}
}
Retrieves the status of a pipeline run, which can be one of the following: Queued, InProgress, Succeeded, Failed, Canceling, or Cancelled.
Conclusion :
Azure Data Factory pipelines can be scheduled to run at specific intervals or triggered based on events. Pipeline runs provide monitoring and logging capabilities, allowing you to track the progress, performance, and status of your data integration workflows.