Introduction
In this article, we will explore how to use Azure Data Factory's Mapping Data Flow to transform data. Mapping Data Flow runs on scaled-out Apache Spark clusters like Databricks. However, Data Flow provides Data engineers with a graphical designer to construct transformation logics with little to no code.
Challenges
In our previous article, we created an Azure Data Factory instance with managed virtual network and we want to leverage this service to do our data transformation.
One of the common task Data engineer gets is to create aggregation over a set of data. We will demonstrate this using the WideWorldImporters sample database provided by Microsoft. You can setup the database following the link in the References section.
In this database, we are interested in 2 tables:
- Sales.Invoices: This table contains the details of customer invoices, including the invoice date.
- Sales.InvoiceLines: This table contains the line items of the customer invoices, including the profit for each line items.
By joining the 2 tables, we can calculate the total profits for each invoice date. We will create a new table in the database to store the output, so the values are pre-calculated.
Tutorial
Create Database schema and report table
Before we dive into Mapping Data Flow, we will create a schema and the output table first. We will do this using SQL,
-- Create the schema and result table
CREATE SCHEMA [SampleReports];
CREATE TABLE [SampleReports].[DailyProfits] (
[InvoiceDate] [date] NOT NULL,
[DailyProfit] [decimal](18, 2) NOT NULL,
[CreatedWhen] [datetime2](7) NOT NULL,
[PipelineId] [nvarchar](100) NOT NULL
);
In addition to the data we are interested in, we added 2 additional columns for audit purposes,
- CreatedWhen contains the timestamp when the record is created.
- PipelineId contains the Pipeline Run Id from Data Factory.
After executing the SQL statements above, we are ready to go into Azure Data Factory.
Mapping Data flow
-
In Azure Data Factory Studio, click on 'Managed'. Under 'Linked Services', create a new Linked service to your Azure SQL server.
- We selected the Azure Integration Runtime for the Managed VNet and enable 'Interactive authoring' to test the connection.
- Create a new 'Managed private endpoint connection' is required. Please refer to my previous article, if you need help with this step.
- Click on 'Test Connection' before saving to ensure all the configurations are correct.
- For simplicity, I provided the password directly to the linked service. The best practice would be to leverage Managed Identity for the connection.
-
In Azure Data Factory Studio, click on 'Author'. Under Factory Resources, click on '+' icon and select 'Dataset'.
-
We will create 3 datasets, 1 for each table by selecting from the 'Table name' drop down:
- SalesInvoices: Sales.Invoices
- SalesInvoiceLines: Sales.InvoiceLines
- ReportDailyProfits: SampleReports.DailyProfits
-
We should see 3 Dataset under Factory Resources now.
-
To create a Data flow, we will click on the '+' icon under Factory Resources and select 'Data flow'.
-
We have a blank Data flow canvas. We will name our Data flow: ReportsDataflow.
-
Before we start adding components into the data flow, we will turn on 'Data flow debug'. This will allow us to Preview the data in the database and any transformation output.
We should review the settings in the screenshot:
- We have selected the AutoResolveIntegrationRuntime this time. The advantage of this is it enable us to dynamically configure the Compute type and Core count during runtime in the later step.
- We will be paying for 1 hour of compute minimum.
- The Core count can be configured in the Managed > Integration Runtime screen.
-
Let's add our SalesInvoices dataset by clicking on 'Add Source'. We will repeat this same step to add SalesInvoiceLines dataset.
-
Let's click on 'Data preview' for the 'salesInvoices' dataset. We can see some of the records in the database.
-
We will join the 2 tables together using the InvoiceID.
- Click on the '+' next to the 'salesInvoices' and select 'Join' under 'Multiple Inputs/Outputs'.
- Provide an Output stream name. This name will be used as a reference for later steps.
- Select 'salesInvoices' for Left stream.
- Select 'salesInvoiceLines' for the Right stream.
- Select 'Inner' for Join type
- Under Join conditions, select 'salesInvoices.InvoiceID' and 'salesInvoiceLines.InvoiceId'.
-
Before we do the aggregation, we need to remove all the extra columns first.
- Click on the '+' next to the 'JoinInvoiceWithLines' and select 'Select' under Schema modifier.
- Provide an Output stream name.
- Select 'JoinInvoiceWithLines' for Incoming Stream. A list of mappings will be displayed under Input columns. If you have 'Auto mapping' enabled, disable it.
- Select all the columns except for InvoiceDate and LineProfit then click 'Delete'.
-
Let's aggregate the data to create the DailyProfit column.
- Click on the '+' next to the 'SelectColumns' and select 'Aggregate' under Schema modifier.
- Provide an Output stream name.
- Under 'Group by' section, select 'InvoiceDate' as the Column.
- Under 'Aggregates' section, type in 'DailyProfit' under Column and 'sum(LineProfit)' under Expression. If you are unfamiliar with what functions can be called, you can access the Expression Builder for assistance.
-
Now its a good time to verify our aggregation is correct. Click on 'Data preview' for the sample result.
-
With the sample result validated, we need to add our audit columns. To do so, we need to introduce a Data flow parameter. This parameter will be passed in by the ADF Pipeline in a later step.
- Click on the empty canvas area.
- Under Parameters click '+ New'.
- Enter in 'PipelineId' for Name, 'string' for Type and '0000-0000' as Default value
-
To create the audit columns,
- Click on the '+' next to the 'DailyProfit' and select 'Derived Column' under Schema modifier.
- Provide an Output stream name.
- Select 'DailyProfit' as the Incoming stream.
- Click on '+ Add' twice to create 2 entries.
- Enter 'CreatedWhen' for Column and 'currentTimestamp()' for Expression. The column will be populated with the processing timestamp.
- Enter 'PipelineId' for Column and '$PipelineId' for Expression. The value will be retrieved by the Dataflow parameter we created last step during processing.
-
Finally, we will write the result to our destination table.
- Click on the '+' next to the 'AuditColumns' and select 'Sink' under Destination.
- Provide an Output stream name.
- Select 'AuditColumns' as the Incoming stream.
- Select 'ReportDailyProfits' as the Dataset.
-
To complete the end-to-end process, we need to create a ADF pipeline. We will not be able to cover the full detail in this article, but I will cover 3 key items:
-
Settings
As referenced earlier in the tutorial, if we utilize the 'AutoResolveIntegrationRuntime', we can configure the Compute type and Core count. This is done here.
-
Parameters
The screenshot shows how to pass the Pipeline Run Id to the data flow.
-
ADF Pipeline Debug
Run the pipeline debug to ensure the end-to-end process completes and the output is stored in the database. Use your favour SQL Editor to check the result!
Summary
With the power of Apache Spark, Azure Data Factory provides Data engineers a single tool for orchestration and process big data efficiently. The no-code approach enables quicker adoption and provide some similarity to SSIS development.
To productionize the solution above, we need to consider the retry settings and creating a trigger as well.
Happy Learning!
References