Azure data factory Full Load pipeline
There are main pipeline which start the ADF machine as we have integration runtime to connect Oracle database .
{
"name": "PL_INCR_Master_CCBProd_Extract",
"properties": {
"activities": [
{
"name": "PL_CCB_Full_Extraction",
"type": "ExecutePipeline",
"dependsOn": [
{
"activity": "IfFileExistsDelete",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "PL_CCB_Full_Extraction_Generic",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"iscdc": 0
}
}
},
{
"name": "Copy_Create_File",
"type": "Copy",
"dependsOn": [
{
"activity": "PL_CCB_Full_Extraction",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "select getdate() as TodaysDate",
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "DS_SQL_Input_Transfomation_Generic",
"type": "DatasetReference",
"parameters": {
"TableName": "NoTable",
"SchemaName": "CCB"
}
}
],
"outputs": [
{
"referenceName": "DS_ADLS_CCB_PROD_Extract_File",
"type": "DatasetReference",
"parameters": {
"folder_name": "ccbprod",
"file_name": "ccb_prod_extraction_completion.txt"
}
}
]
},
{
"name": "GetMetadataCCBProdFile",
"type": "GetMetadata",
"dependsOn": [
{
"activity": "Wait1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DS_ADLS_CCB_PROD_Extract_File",
"type": "DatasetReference",
"parameters": {
"folder_name": "ccbprod",
"file_name": "ccb_prod_extraction_completion.txt"
}
},
"fieldList": [
"exists",
"lastModified",
"itemName"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "IfFileExistsDelete",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetMetadataCCBProdFile",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@activity('GetMetadataCCBProdFile').output.exists",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "DeleteCCBExtractFileCreated",
"type": "Delete",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DS_ADLS_CCB_PROD_Extract_File",
"type": "DatasetReference",
"parameters": {
"folder_name": "ccbprod",
"file_name": "ccb_prod_extraction_completion.txt"
}
},
"enableLogging": false,
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
}
}
}
]
}
},
{
"name": "RestartADFMachine",
"type": "WebHook",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"url": {
"value": "@pipeline().globalParameters.ADFRestart",
"type": "Expression"
},
"method": "POST",
"body": {
"VMAction": "Start"
},
"timeout": "01:10:00"
}
},
{
"name": "Wait1",
"type": "Wait",
"dependsOn": [
{
"activity": "RestartADFMachine",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"waitTimeInSeconds": 1050
}
}
],
"concurrency": 1,
"annotations": [],
"lastPublishTime": "2023-01-30T18:27:53Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
We have use execute pipeline to load all dim and fact .first part to load only small dim table :-
{
"name": "PL_CCB_Full_Extraction_Generic",
"properties": {
"activities": [
{
"name": "GetTableList",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderStoredProcedureName": "[STAGING].[usp_IncrementalExtractions]",
"storedProcedureParameters": {
"CDCFlag": {
"type": "Int32",
"value": {
"value": "@pipeline().parameters.iscdc",
"type": "Expression"
}
}
},
"queryTimeout": "02:00:00",
"partitionOption": "None"
},
"dataset": {
"referenceName": "ExractioonSourceDatasetQuery",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachToDelete",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetTableList",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('GetTableList').output.Value",
"type": "Expression"
},
"activities": [
{
"name": "GetADLSMetadata",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DS_ADLS_Incremental_Input",
"type": "DatasetReference",
"parameters": {
"Dir": {
"value": "@item().TableName",
"type": "Expression"
},
"IsCDC": {
"value": "@item().IsCDC",
"type": "Expression"
}
}
},
"fieldList": [
"exists",
"itemName",
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "IfCDCFalse",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetADLSMetadata",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@activity('GetADLSMetadata').output.exists",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "ClearPreviousRunsNonCDCTable",
"type": "Delete",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DS_ADLS_Incremental_Input",
"type": "DatasetReference",
"parameters": {
"Dir": {
"value": "@activity('GetADLSMetadata').output.itemName",
"type": "Expression"
},
"IsCDC": {
"value": "@item().IsCDC",
"type": "Expression"
}
}
},
"enableLogging": false,
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"maxConcurrentConnections": 1,
"recursive": true
}
}
}
]
}
}
]
}
},
{
"name": "ForEachToLoad",
"type": "ForEach",
"dependsOn": [
{
"activity": "ForEachToDelete",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('GetTableList').output.value",
"type": "Expression"
},
"isSequential": false,
"activities": [
{
"name": "ExtractionSourceTables",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 3,
"retryIntervalInSeconds": 120,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "OracleSource",
"oracleReaderQuery": {
"value": "@{item().Query}",
"type": "Expression"
},
"partitionOption": "None",
"queryTimeout": "24:00:00"
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"parallelCopies": 1
},
"inputs": [
{
"referenceName": "ExtractFullDataFromOracle",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DS_ADLS_Incremental_Output",
"type": "DatasetReference",
"parameters": {
"Dir": {
"value": "@item().TableName",
"type": "Expression"
},
"FileName": {
"value": "@CONCAT(item().TableName,'_',REPLACE(REPLACE(REPLACE(SUBSTRING(UTCNOW(),0,19),'T','_'),'-','_'),':','_'),'.txt')",
"type": "Expression"
},
"IsCDC": {
"value": "@item().IsCDC",
"type": "Expression"
},
"IsHistory": {
"value": "@item().IsHistory",
"type": "Expression"
}
}
}
]
},
{
"name": "SpLoadLoggingOnSuccess",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "ExtractionSourceTables",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
"storedProcedureParameters": {
"ACTIVITY_NAME": {
"value": {
"value": "ExtractionSourceTables",
"type": "Expression"
},
"type": "String"
},
"FILEPATH": {
"value": {
"value": "@item().ExtractionPath",
"type": "Expression"
},
"type": "String"
},
"PIPELINE_NAME": {
"value": {
"value": "@pipeline().Pipeline",
"type": "Expression"
},
"type": "String"
},
"ROWS_READ": {
"value": {
"value": "@activity('ExtractionSourceTables').output.rowsRead",
"type": "Expression"
},
"type": "Int32"
},
"ROWS_WRITTEN": {
"value": {
"value": "@activity('ExtractionSourceTables').output.rowsCopied",
"type": "Expression"
},
"type": "Int32"
},
"STAGE": {
"value": "OracleToADLS",
"type": "String"
},
"STATUS": {
"value": "SUCCESS",
"type": "String"
},
"TABLE_NAME": {
"value": {
"value": "@item().TableName",
"type": "Expression"
},
"type": "String"
},
"ERROR": {
"value": " ",
"type": "String"
},
"START_DATE": {
"value": {
"value": "@activity('ExtractionSourceTables').output.executionDetails[0].start",
"type": "Expression"
},
"type": "Datetime"
}
}
},
"linkedServiceName": {
"referenceName": "Connection_SQLDB_SelfHostedIR",
"type": "LinkedServiceReference"
}
},
{
"name": "SpLoadLoggingOnFailure",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "ExtractionSourceTables",
"dependencyConditions": [
"Failed"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
"storedProcedureParameters": {
"ACTIVITY_NAME": {
"value": {
"value": "ExtractionSourceTables",
"type": "Expression"
},
"type": "String"
},
"FILEPATH": {
"value": {
"value": "@item().ExtractionPath",
"type": "Expression"
},
"type": "String"
},
"PIPELINE_NAME": {
"value": {
"value": "@pipeline().Pipeline",
"type": "Expression"
},
"type": "String"
},
"ROWS_READ": {
"value": {
"value": "@activity('ExtractionSourceTables').output.rowsRead",
"type": "Expression"
},
"type": "Int32"
},
"ROWS_WRITTEN": {
"value": {
"value": "@activity('ExtractionSourceTables').output.rowsCopied",
"type": "Expression"
},
"type": "Int32"
},
"STAGE": {
"value": "OracleToADLS",
"type": "String"
},
"STATUS": {
"value": "FAILED",
"type": "String"
},
"TABLE_NAME": {
"value": {
"value": "@item().TableName",
"type": "Expression"
},
"type": "String"
},
"ERROR": {
"value": {
"value": "@activity('ExtractionSourceTables').output.errors[0].Message",
"type": "Expression"
},
"type": "String"
},
"START_DATE": {
"value": {
"value": "@activity('ExtractionSourceTables').output.executionDetails[0].start",
"type": "Expression"
},
"type": "Datetime"
}
}
},
"linkedServiceName": {
"referenceName": "Connection_SQLDB_SelfHostedIR",
"type": "LinkedServiceReference"
}
},
{
"name": "IfCDCTableUpdateWaterMark",
"type": "IfCondition",
"dependsOn": [
{
"activity": "SpLoadLoggingOnSuccess",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(item().IsCDC,1)",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "LookupOracleWaterMark",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "OracleSource",
"oracleReaderQuery": {
"value": "@{item().WaterMarkMaxDateQuery}",
"type": "Expression"
},
"partitionOption": "None",
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ExtractFromOracle",
"type": "DatasetReference"
}
}
},
{
"name": "SPUpdateWaterMarkFieldValue",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "LookupOracleWaterMark",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[STAGING].[usp_UpdateWaterMarkValue]",
"storedProcedureParameters": {
"tableName": {
"value": {
"value": "@item().TableName",
"type": "Expression"
},
"type": "String"
},
"watermarkvalue": {
"value": {
"value": "@activity('LookupOracleWaterMark').output.firstRow.MAXWATERMARKVALUE",
"type": "Expression"
},
"type": "String"
}
}
},
"linkedServiceName": {
"referenceName": "Connection_SQLDB_SelfHostedIR",
"type": "LinkedServiceReference"
}
}
]
}
},
{
"name": "CopyFilesToIncrementalZone",
"type": "ExecutePipeline",
"dependsOn": [
{
"activity": "IfCDCTableUpdateWaterMark",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "PL_Incr_CopyIncrementalFiles",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"TableName": {
"value": "@{item().TableName}",
"type": "Expression"
},
"IsCDC": {
"value": "@item().IsCDC",
"type": "Expression"
}
}
}
}
]
}
}
],
"concurrency": 1,
"parameters": {
"iscdc": {
"type": "int",
"defaultValue": 0
}
},
"folder": {
"name": "ExtractionPipeline"
},
"annotations": [],
"lastPublishTime": "2023-01-30T18:25:17Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}