Uncategorized

Azure data factory Full Load pipeline Extraction zone

Azure data factory Full Load pipeline

There are main pipeline which start the ADF machine as we have integration runtime to connect Oracle database .

{
    "name": "PL_INCR_Master_CCBProd_Extract",
    "properties": {
        "activities": [
            {
                "name": "PL_CCB_Full_Extraction",
                "type": "ExecutePipeline",
                "dependsOn": [
                    {
                        "activity": "IfFileExistsDelete",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "pipeline": {
                        "referenceName": "PL_CCB_Full_Extraction_Generic",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true,
                    "parameters": {
                        "iscdc": 0
                    }
                }
            },
            {
                "name": "Copy_Create_File",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "PL_CCB_Full_Extraction",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "select getdate() as TodaysDate",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "sink": {
                        "type": "DelimitedTextSink",
                        "storeSettings": {
                            "type": "AzureBlobFSWriteSettings"
                        },
                        "formatSettings": {
                            "type": "DelimitedTextWriteSettings",
                            "quoteAllText": true,
                            "fileExtension": ".txt"
                        }
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "typeConversion": true,
                        "typeConversionSettings": {
                            "allowDataTruncation": true,
                            "treatBooleanAsNumber": false
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "DS_SQL_Input_Transfomation_Generic",
                        "type": "DatasetReference",
                        "parameters": {
                            "TableName": "NoTable",
                            "SchemaName": "CCB"
                        }
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "DS_ADLS_CCB_PROD_Extract_File",
                        "type": "DatasetReference",
                        "parameters": {
                            "folder_name": "ccbprod",
                            "file_name": "ccb_prod_extraction_completion.txt"
                        }
                    }
                ]
            },
            {
                "name": "GetMetadataCCBProdFile",
                "type": "GetMetadata",
                "dependsOn": [
                    {
                        "activity": "Wait1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "DS_ADLS_CCB_PROD_Extract_File",
                        "type": "DatasetReference",
                        "parameters": {
                            "folder_name": "ccbprod",
                            "file_name": "ccb_prod_extraction_completion.txt"
                        }
                    },
                    "fieldList": [
                        "exists",
                        "lastModified",
                        "itemName"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "recursive": true,
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings"
                    }
                }
            },
            {
                "name": "IfFileExistsDelete",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "GetMetadataCCBProdFile",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@activity('GetMetadataCCBProdFile').output.exists",
                        "type": "Expression"
                    },
                    "ifTrueActivities": [
                        {
                            "name": "DeleteCCBExtractFileCreated",
                            "type": "Delete",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "DS_ADLS_CCB_PROD_Extract_File",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folder_name": "ccbprod",
                                        "file_name": "ccb_prod_extraction_completion.txt"
                                    }
                                },
                                "enableLogging": false,
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "recursive": true,
                                    "enablePartitionDiscovery": false
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "RestartADFMachine",
                "type": "WebHook",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "url": {
                        "value": "@pipeline().globalParameters.ADFRestart",
                        "type": "Expression"
                    },
                    "method": "POST",
                    "body": {
                        "VMAction": "Start"
                    },
                    "timeout": "01:10:00"
                }
            },
            {
                "name": "Wait1",
                "type": "Wait",
                "dependsOn": [
                    {
                        "activity": "RestartADFMachine",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "waitTimeInSeconds": 1050
                }
            }
        ],
        "concurrency": 1,
        "annotations": [],
        "lastPublishTime": "2023-01-30T18:27:53Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

We have use execute pipeline to load all dim and fact .first part to load only small dim table :-

{
    "name": "PL_CCB_Full_Extraction_Generic",
    "properties": {
        "activities": [
            {
                "name": "GetTableList",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderStoredProcedureName": "[STAGING].[usp_IncrementalExtractions]",
                        "storedProcedureParameters": {
                            "CDCFlag": {
                                "type": "Int32",
                                "value": {
                                    "value": "@pipeline().parameters.iscdc",
                                    "type": "Expression"
                                }
                            }
                        },
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "ExractioonSourceDatasetQuery",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "ForEachToDelete",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetTableList",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('GetTableList').output.Value",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "GetADLSMetadata",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "DS_ADLS_Incremental_Input",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "Dir": {
                                            "value": "@item().TableName",
                                            "type": "Expression"
                                        },
                                        "IsCDC": {
                                            "value": "@item().IsCDC",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "exists",
                                    "itemName",
                                    "childItems"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "recursive": true
                                },
                                "formatSettings": {
                                    "type": "DelimitedTextReadSettings"
                                }
                            }
                        },
                        {
                            "name": "IfCDCFalse",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "GetADLSMetadata",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@activity('GetADLSMetadata').output.exists",
                                    "type": "Expression"
                                },
                                "ifTrueActivities": [
                                    {
                                        "name": "ClearPreviousRunsNonCDCTable",
                                        "type": "Delete",
                                        "dependsOn": [],
                                        "policy": {
                                            "timeout": "7.00:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "dataset": {
                                                "referenceName": "DS_ADLS_Incremental_Input",
                                                "type": "DatasetReference",
                                                "parameters": {
                                                    "Dir": {
                                                        "value": "@activity('GetADLSMetadata').output.itemName",
                                                        "type": "Expression"
                                                    },
                                                    "IsCDC": {
                                                        "value": "@item().IsCDC",
                                                        "type": "Expression"
                                                    }
                                                }
                                            },
                                            "enableLogging": false,
                                            "storeSettings": {
                                                "type": "AzureBlobFSReadSettings",
                                                "maxConcurrentConnections": 1,
                                                "recursive": true
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    ]
                }
            },
            {
                "name": "ForEachToLoad",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "ForEachToDelete",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('GetTableList').output.value",
                        "type": "Expression"
                    },
                    "isSequential": false,
                    "activities": [
                        {
                            "name": "ExtractionSourceTables",
                            "type": "Copy",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 3,
                                "retryIntervalInSeconds": 120,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "OracleSource",
                                    "oracleReaderQuery": {
                                        "value": "@{item().Query}",
                                        "type": "Expression"
                                    },
                                    "partitionOption": "None",
                                    "queryTimeout": "24:00:00"
                                },
                                "sink": {
                                    "type": "DelimitedTextSink",
                                    "storeSettings": {
                                        "type": "AzureBlobFSWriteSettings"
                                    },
                                    "formatSettings": {
                                        "type": "DelimitedTextWriteSettings",
                                        "quoteAllText": true,
                                        "fileExtension": ".txt"
                                    }
                                },
                                "enableStaging": false,
                                "parallelCopies": 1
                            },
                            "inputs": [
                                {
                                    "referenceName": "ExtractFullDataFromOracle",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "DS_ADLS_Incremental_Output",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "Dir": {
                                            "value": "@item().TableName",
                                            "type": "Expression"
                                        },
                                        "FileName": {
                                            "value": "@CONCAT(item().TableName,'_',REPLACE(REPLACE(REPLACE(SUBSTRING(UTCNOW(),0,19),'T','_'),'-','_'),':','_'),'.txt')",
                                            "type": "Expression"
                                        },
                                        "IsCDC": {
                                            "value": "@item().IsCDC",
                                            "type": "Expression"
                                        },
                                        "IsHistory": {
                                            "value": "@item().IsHistory",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        },
                        {
                            "name": "SpLoadLoggingOnSuccess",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "ExtractionSourceTables",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
                                "storedProcedureParameters": {
                                    "ACTIVITY_NAME": {
                                        "value": {
                                            "value": "ExtractionSourceTables",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "FILEPATH": {
                                        "value": {
                                            "value": "@item().ExtractionPath",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "PIPELINE_NAME": {
                                        "value": {
                                            "value": "@pipeline().Pipeline",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ROWS_READ": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.rowsRead",
                                            "type": "Expression"
                                        },
                                        "type": "Int32"
                                    },
                                    "ROWS_WRITTEN": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.rowsCopied",
                                            "type": "Expression"
                                        },
                                        "type": "Int32"
                                    },
                                    "STAGE": {
                                        "value": "OracleToADLS",
                                        "type": "String"
                                    },
                                    "STATUS": {
                                        "value": "SUCCESS",
                                        "type": "String"
                                    },
                                    "TABLE_NAME": {
                                        "value": {
                                            "value": "@item().TableName",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ERROR": {
                                        "value": " ",
                                        "type": "String"
                                    },
                                    "START_DATE": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.executionDetails[0].start",
                                            "type": "Expression"
                                        },
                                        "type": "Datetime"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "Connection_SQLDB_SelfHostedIR",
                                "type": "LinkedServiceReference"
                            }
                        },
                        {
                            "name": "SpLoadLoggingOnFailure",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "ExtractionSourceTables",
                                    "dependencyConditions": [
                                        "Failed"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
                                "storedProcedureParameters": {
                                    "ACTIVITY_NAME": {
                                        "value": {
                                            "value": "ExtractionSourceTables",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "FILEPATH": {
                                        "value": {
                                            "value": "@item().ExtractionPath",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "PIPELINE_NAME": {
                                        "value": {
                                            "value": "@pipeline().Pipeline",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ROWS_READ": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.rowsRead",
                                            "type": "Expression"
                                        },
                                        "type": "Int32"
                                    },
                                    "ROWS_WRITTEN": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.rowsCopied",
                                            "type": "Expression"
                                        },
                                        "type": "Int32"
                                    },
                                    "STAGE": {
                                        "value": "OracleToADLS",
                                        "type": "String"
                                    },
                                    "STATUS": {
                                        "value": "FAILED",
                                        "type": "String"
                                    },
                                    "TABLE_NAME": {
                                        "value": {
                                            "value": "@item().TableName",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ERROR": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.errors[0].Message",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "START_DATE": {
                                        "value": {
                                            "value": "@activity('ExtractionSourceTables').output.executionDetails[0].start",
                                            "type": "Expression"
                                        },
                                        "type": "Datetime"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "Connection_SQLDB_SelfHostedIR",
                                "type": "LinkedServiceReference"
                            }
                        },
                        {
                            "name": "IfCDCTableUpdateWaterMark",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "SpLoadLoggingOnSuccess",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@equals(item().IsCDC,1)",
                                    "type": "Expression"
                                },
                                "ifTrueActivities": [
                                    {
                                        "name": "LookupOracleWaterMark",
                                        "type": "Lookup",
                                        "dependsOn": [],
                                        "policy": {
                                            "timeout": "7.00:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "source": {
                                                "type": "OracleSource",
                                                "oracleReaderQuery": {
                                                    "value": "@{item().WaterMarkMaxDateQuery}",
                                                    "type": "Expression"
                                                },
                                                "partitionOption": "None",
                                                "queryTimeout": "02:00:00"
                                            },
                                            "dataset": {
                                                "referenceName": "ExtractFromOracle",
                                                "type": "DatasetReference"
                                            }
                                        }
                                    },
                                    {
                                        "name": "SPUpdateWaterMarkFieldValue",
                                        "type": "SqlServerStoredProcedure",
                                        "dependsOn": [
                                            {
                                                "activity": "LookupOracleWaterMark",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "policy": {
                                            "timeout": "7.00:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "storedProcedureName": "[STAGING].[usp_UpdateWaterMarkValue]",
                                            "storedProcedureParameters": {
                                                "tableName": {
                                                    "value": {
                                                        "value": "@item().TableName",
                                                        "type": "Expression"
                                                    },
                                                    "type": "String"
                                                },
                                                "watermarkvalue": {
                                                    "value": {
                                                        "value": "@activity('LookupOracleWaterMark').output.firstRow.MAXWATERMARKVALUE",
                                                        "type": "Expression"
                                                    },
                                                    "type": "String"
                                                }
                                            }
                                        },
                                        "linkedServiceName": {
                                            "referenceName": "Connection_SQLDB_SelfHostedIR",
                                            "type": "LinkedServiceReference"
                                        }
                                    }
                                ]
                            }
                        },
                        {
                            "name": "CopyFilesToIncrementalZone",
                            "type": "ExecutePipeline",
                            "dependsOn": [
                                {
                                    "activity": "IfCDCTableUpdateWaterMark",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "pipeline": {
                                    "referenceName": "PL_Incr_CopyIncrementalFiles",
                                    "type": "PipelineReference"
                                },
                                "waitOnCompletion": true,
                                "parameters": {
                                    "TableName": {
                                        "value": "@{item().TableName}",
                                        "type": "Expression"
                                    },
                                    "IsCDC": {
                                        "value": "@item().IsCDC",
                                        "type": "Expression"
                                    }
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "concurrency": 1,
        "parameters": {
            "iscdc": {
                "type": "int",
                "defaultValue": 0
            }
        },
        "folder": {
            "name": "ExtractionPipeline"
        },
        "annotations": [],
        "lastPublishTime": "2023-01-30T18:25:17Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Leave a Reply

Your email address will not be published. Required fields are marked *

Back To Top