How to load data incrementally using Azure Data factory

Image

Overview

Incremental loading runs periodically after the initial loading of data and it loads the updated data(Delta data).

Note:

There should be a column in the source to identify last processed data (for example- created date, last modified date etc)

Workflow:

1.Load the full data and identify the column to check the last updated data.
2.Run the pipeline to load delta data.
3.Copy the delta data to the destination.

Pipeline 

azure data factory pipeline for adding data

There are 2 activities in pipeline and 1 stored procedure. First activity to copy from source to staging and second staging to the destination. The stored procedure is used to empty the staging after 2nd activity.
Consider an example where the source is Mysql and destination is SQL server.

 {
       "name": "Pipeline",	
	"properties": {
		"activities": [                
		  ...           
		]         
		}

In activity 1 we are trying to copy data from MySQL to staging. Staging table can be at SQL server or blob. While loading the data inactivity, constraint the query to load only delta data. For example - query to load data where created_date is greater than yesterday.

 "query":"select * from tablename where created_date >now()-INTERVAL 1 day”

Output dataset of activity 1 will be the input of activity 2, ensures if activity 1 completed only then activity 2 will start.

{
   "name": "Activity_1”
                "type": "Copy",
               "typeProperties": {
                   "source": {
                       "type": "RelationalSource",
                       "Query": ""
                   },
                   "sink": {
                    },
                    "translator": {
                      "type": "TabularTranslator",
                       "columnMappings": ""
                   },
                "inputs": [
                     {
                          "name": "InputDataset_Activity1"
                       }
	                ],
               "outputs": [
                   {
                       "name": "OutputDataseta_Activity1"
                   }
               ],

                "policy": {...
                },
                "scheduler": {
                 }
              },

Activity 2 copies data from staging to destination table. It has two input dataset, output of activity 1 and data from staging. After that stored procedure will run.

 {
 		   "name": "Activity_2”
                "type": "Copy",
              "typeProperties": {
                   "source": {
                       "type": "RelationalSource",
                       "Query": ""
                   },
                   "sink": {
                    },
                    "translator": {
                      "type": "TabularTranslator",
                       "columnMappings": ""
                   },
                "inputs": [
                     {
                          "name": "InputDataset_Activity1"
                       }
                     {
                          "name": "InputDataset_Activity2"
                       }
	                ],
               "outputs": [
                   {
                       "name": "OutputDataseta_Activity2"
                   }
               ],

                "policy": {...
                },
                "scheduler": {
                 }
              },

At SQL server, create a stored procedure to truncate staging table, remove duplicates from destination table. The stored procedure has input dataset as the output of activity 2 datasets. There is no output of stored procedure so needs to create dummy output dataset which is used only to specify the schedule for running the stored procedure activity. Dummy dataset creation, table name should be blank.

 {		
    "name": "DummyOutputDataset",	
 						{	
    "name": "DummyOutputDataset",	
    "properties": {	
        "type": "SqlServerTable",
        "linkedServiceName": "",
        "structure": [],
        "typeProperties": {
            "tableName": "" 
         },
          "availability": {
               "frequency": "",
            "interval": ""
        }
    }
  }

Calling stored procedure in the pipeline.

{	
  		   "name": "Storedprocedure"	
                  "type": "SqlServerStoredProcedure",	
                   "typeProperties": {	
	                    "storedProcedureName": "Procedure_name",	
	                     "storedProcedureParameters": {}	
	  
                },	
                "inputs": [	 
                    {	 
                        "name": "DummyOutputDataset"	
                    }	
                ],	
                "outputs": [	
                    {	
                        "name": "Dummy"	
                    }	 
                 ],
                 "scheduler": {}
             }
          ],
           ...
	 {

Written by:

Urvi Verma

Data Architect

LinkedIn

Related Post

Leave a Reply