In this article, we will create an Azure Data Factory and Pipeline using .NET SDK. We will create two linked services and two datasets - One for the source dataset and another one for the destination (sink) dataset. Here, we will use Azure Blob Storage as input data source and Cosmos DB as the output (sink) data source. We will copy the data from the CSV file (which is in Azure Blob Storage) to the Cosmos DB database.
I have already explained about Azure Data Factory and how to copy the activity using the Azure portal in my previous article. Here, we will see how to achieve the same result with .NET SDK.
Step 1 - Create Azure Blob Storage
Create a resource -> Storage -> Storage account.
Click the “Review + Create” button.
Now, click the “Create” button.
After some time, our storage account will be created successfully. We can go to the resource now.
We can upload one sample CSV file to Blob storage. This CSV file contains employee data, has 3 columns (name, age, and department). Currently 3 rows are available in this CSV file. Open Storage explorer and create a new container.
We can right-click the BLOB CONTAINER and click “Create Blob Container”.
Give a valid name to the container. It is case sensitive.
We can upload the CSV file from our local hard drive. Please choose “SAS“ as authentication type.
We have successfully created a Blob Storage and uploaded the CSV file to the blob container.
Step 2 - Create Azure Cosmos DB account
Create a new resource -> databases -> Cosmos DB.
Please give a valid name to the Cosmos DB account and choose resource group also. You can click “Review + Create” button.
After successful validation, click “Create” button.
We can go to Cosmos DB account and open “Data Explorer” tab.
We need to create a new database.
We can create a new collection inside the database. Please give “name” column as a partition key. (Partition is like the Primary key in RDMS).
We have successfully created a Cosmos DB account and we created a new database and collection too.
Step 3 - Create Azure Data Factory and Pipeline using .NET SDK
For creating any Azure resource from .NET, we must install the .NET SDK first. We need the below information from the Azure portal to create a resource using .NET SDK.
- Tenant ID
- Subscription ID
- Application ID
- Authentication Key
We can get the Tenant ID from Azure Portal. Click “Azure Active Directory” -> “Properties” -> and choose Directory ID.
The Directory ID is the Tenant ID.
Click “Subscriptions” tab and choose Subscription Id.
We need an application id and authentication key also. We can create a new app registration and get the id and key.
Click “Azure Active Directory” -> “App registrations” and click “New application registration” button.
We must give a valid name to our app registration and choose Web app / API as the application type. You can give a dummy URL as Sign-on URL. Click Create button.
Your app registration will be created shortly and copy the application id and save it to any safe place.
You can click “Settings” button to create an authentication key. Click “Keys” button and give a description and expiry period. Please note our key will be automatically generated while saving the entries. Click “Save” button.
Copy the key value and save to a safe place. We will be using this key value later in our .NET application. Please note, you can’t retrieve this key after you leave this blade.
We have successfully got the Tenant ID, Subscription ID, Application ID and Authentication Key from the Azure portal.
We must grant access roles to our App registration. (This is mandatory for creating resources from .NET SDK).
Click “Subscription” button and click “Access control (IAM)” tab. Click “Add” button to grant new access role.
Choose our previously created app and give “Contributor” role.
Create a new .NET Console application.
Save the project.
We can install the “Microsoft.Azure.Management.DataFactory” NuGet Package now.
Open Package Manager Console and execute the below command.
- Install-Package Microsoft.Azure.Management.DataFactory
Install two more packages.
- Install-Package Microsoft.Azure.Management.ResourceManager -Prerelease
- Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
We can modify the static “Main” method inside “Program” class to create Azure Data Factory and Pipeline.
This method will be automatically executed while starting the application.
Set variables
-
- string tenantID = "<fill the value>";
- string subscriptionId = "<fill the value>";
- string applicationId = "<fill the value>";
- string authenticationKey = "<fill the value>";
- string resourceGroup = "sarath-rg";
- string region = "East US";
- string dataFactoryName = "sarathadf1";
Specify the source Azure Blob information
-
- string storageAccount = "sarathstorage";
- string storageKey = "<fill the value>";
- string inputBlobPath = "sarathcontainer/";
- string inputBlobName = "employee.csv";
Specify the Azure Cosmos DB information
-
- string azureCosmosDBConnString = "AccountEndpoint=https://sarathcosmosdb.documents.azure.com:443/;AccountKey=<account key>;Database=sarathlal";
- string azureCosmosDBCollection = "employee";
Specify the Linked Service Names and Dataset Names
- string blobStorageLinkedServiceName = "AzureBlobStorageLinkedService";
- string cosmosDbLinkedServiceName = "AzureCosmosDbLinkedService";
- string blobDatasetName = "BlobDataset";
- string cosmosDbDatasetName = "CosmosDbDataset";
- string pipelineName = "SarathADFBlobToCosmosDbCopy";
We can authenticate and create a data factory management client
-
- var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
- ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
- AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
- ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
- var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Create data factory and wait.
-
- Console.WriteLine("Creating data factory " + dataFactoryName + "...");
- Factory dataFactory = new Factory
- {
- Location = region,
- Identity = new FactoryIdentity()
-
- };
- client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
- Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));
-
- while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
- {
- System.Threading.Thread.Sleep(1000);
- }
Create an Azure Blob Storage linked service
-
- Console.WriteLine("Creating linked service " + blobStorageLinkedServiceName + "...");
-
- LinkedServiceResource storageLinkedService = new LinkedServiceResource(
- new AzureStorageLinkedService
- {
- ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
- }
- );
- client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, blobStorageLinkedServiceName, storageLinkedService);
- Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
Create an Azure Cosmos DB linked service
-
- Console.WriteLine("Creating linked service " + cosmosDbLinkedServiceName + "...");
-
- LinkedServiceResource cosmosDbLinkedService = new LinkedServiceResource(
- new CosmosDbLinkedService
- {
- ConnectionString = new SecureString(azureCosmosDBConnString),
- }
- );
- client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, cosmosDbLinkedServiceName, cosmosDbLinkedService);
- Console.WriteLine(SafeJsonConvert.SerializeObject(cosmosDbLinkedService, client.SerializationSettings));
Create an Azure Blob dataset
-
- Console.WriteLine("Creating dataset " + blobDatasetName + "...");
- DatasetResource blobDataset = new DatasetResource(
- new AzureBlobDataset
- {
- LinkedServiceName = new LinkedServiceReference
- {
- ReferenceName = blobStorageLinkedServiceName
- },
- FolderPath = inputBlobPath,
- FileName = inputBlobName,
- Format = new TextFormat { ColumnDelimiter = ",", TreatEmptyAsNull = true, FirstRowAsHeader = true },
- Structure = new List<DatasetDataElement>
- {
- new DatasetDataElement
- {
- Name = "name",
- Type = "String"
- },
- new DatasetDataElement
- {
- Name = "age",
- Type = "Int32"
- },
- new DatasetDataElement
- {
- Name = "department",
- Type = "String"
- }
- }
- }
- );
- client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
- Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
Create a Cosmos DB Database dataset
-
- Console.WriteLine("Creating dataset " + cosmosDbDatasetName + "...");
- DatasetResource cosmosDbDataset = new DatasetResource(
- new DocumentDbCollectionDataset
- {
- LinkedServiceName = new LinkedServiceReference
- {
- ReferenceName = cosmosDbLinkedServiceName
- },
- CollectionName = azureCosmosDBCollection
- }
- );
- client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, cosmosDbDatasetName, cosmosDbDataset);
- Console.WriteLine(SafeJsonConvert.SerializeObject(cosmosDbDataset, client.SerializationSettings));
Create a Pipeline with Copy Activity (very important)
-
- Console.WriteLine("Creating pipeline " + pipelineName + "...");
- PipelineResource pipeline = new PipelineResource
- {
- Activities = new List<Activity>
- {
- new CopyActivity
- {
- Name = "CopyFromBlobToCosmosDB",
- Inputs = new List<DatasetReference>
- {
- new DatasetReference()
- {
- ReferenceName = blobDatasetName
- }
- },
- Outputs = new List<DatasetReference>
- {
- new DatasetReference
- {
- ReferenceName = cosmosDbDatasetName
- }
- },
- Source = new BlobSource { },
- Sink = new DocumentDbCollectionSink { }
- }
- }
- };
- client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
- Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));
Create a Pipeline Run
-
- Console.WriteLine("Creating Pipeline run...");
- CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName).Result.Body;
- Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Monitor the Pipeline Run
-
- Console.WriteLine("Checking Pipeline Run Status...");
- PipelineRun pipelineRun;
- while (true)
- {
- pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
- Console.WriteLine("Status: " + pipelineRun.Status);
- if (pipelineRun.Status == "InProgress")
- System.Threading.Thread.Sleep(15000);
- else
- break;
- }
Check the Copy Activity Run Details
-
- Console.WriteLine("Checking copy activity run details...");
- if (pipelineRun.Status == "Succeeded")
- {
- Console.WriteLine("Copy Activity Succeeded!");
- }
- else
- {
- Console.WriteLine("Copy Activity Failed!");
- }
- Console.WriteLine("\nPress any key to exit...");
- Console.ReadKey();
We have completed all the coding for creating Azure Data Factory, pipeline, linked services for both input and output ,and completed data sets also. Now we can run the application. It will take some moments to create all these items and we will write all the logs in to the console. Our application executed successfully without any errors.
We can go to the Azure Cosmos DB account and open a new query to check the current documents (records) available in the employee collection.
Please note that our .NET application successfully copied the data from Blob Storage to Cosmos DB. If you check all the available resources in the Azure portal, you can find that the new Azure data factory is available there.
You can click Azure data factory to open it. Open resource and click “Author and Monitor” button. You can see that there are two datasets and one pipeline available in the data factory.
Each dataset contains the linked services and connection information. If you click at the pipeline, you can get all the details about the pipeline.
You can see many tabs are available in the pipeline. Source and Sink tab contain the information about the dataset and linked service details.
In this article, we have created a Blob Storage and uploaded a CSV file to the Blob container. We have created Cosmos DB account and created a database and a collection. We have created an Azure Data Factory and pipeline along with linked service and datasets for Source and Sink. Finally, we have executed the console application and found that all the resources created successfully. We have seen that the data copied from Blob storage to Cosmos DB successfully.
We will discuss more features of Azure Data Factory in my upcoming articles.