AnalysisServices Azure AzureDataFactory

Azure Analysis services Partition Creation and Processing using Azure Automation Account(PowerShell)

Azure Analysis service is a Azure product which being used for analysis of raw data and provide useful information for making business need and this is cloud based product provide by Microsoft.

This article is useful for create the partition for fact table on monthly basis and process it i.e we can create TableName_082021 and process the created partition .

To create the partition we have used automation account ,please find the url to create automation account

https://docs.microsoft.com/en-us/azure/automation/automation-create-standalone-account

Once Automation account has been created then we need to create one credential

https://docs.microsoft.com/en-us/azure/automation/shared-resources/credentials?tabs=azure-powershell

post that we need to create runbook with PowerShell scripts

After that please add the PowerShell code below :-

param(
     [Parameter (Mandatory = $false)]
     [object] $WebhookData
 )

 $WebhookData  
  
 if($WebhookData){
     $parameters=(ConvertFrom-Json -InputObject $WebhookData.RequestBody) 
     if($parameters.callBackUri) {$callBackUri=$parameters.callBackUri}
 }

##Getting the credential which we stored earlier.
$cred = Get-AutomationPSCredential -Name 'credentialsname'
$ServerName = $parameters.ServerName
$ServerName
$DatabaseName = $parameters.DatabaseName
$DatabaseName
#$TableName ="SalesFact"
$TableName = $parameters.TableName
$TableName
$Query = $parameters.Query
$Query 
$PartitionName = $parameters.PartitionName
$PartitionName
$IsPartitionEnabledFlg = $parameters.IsPartitionEnabled
$IsPartitionEnabledFlg


##creating the partition for the current month and current year ( You can script out the JSON code from SSMS)
try{
    if ($IsPartitionEnabledFlg -eq "True")
{
$Query = "
{
`"createOrReplace`": {
`"object`": {
`"database`": `""+$DatabaseName+"`",
`"table`": `""+$TableName+"`",
`"partition`": `""+$PartitionName+"`",
},
`"partition`": {
`"name`": `""+$PartitionName+"`",
`"dataView`": `"full`",
`"source`": {
`"query`": [
`""+$Query+"`",
],
`"dataSource`": `"SqlAzure`"
}
}
}
}"

Write-Output $Query
##Creating the parition

Invoke-ASCmd -Server $ServerName -Credential $cred -Query $Query
##Processing the partition

$PartName= "SalesFact_"+$startMonth+$startYear
$PartName
$result=Invoke-ProcessPartition -Server $ServerName -Database $DatabaseName -TableName $TableName -PartitionName $PartitionName –RefreshType Full -Credential $cred

 $body = [ordered]@{
    output = @{Message = "Success"}
    statusCode = "200"
    }

$bodyJson = $body | ConvertTo-Json

if($callBackUri){
    Invoke-WebRequest -Uri $callBackUri -UseBasicParsing -Method Post -Body $bodyJson -ContentType "application/json"
}
}


ELSE{
Invoke-ProcessTable -Server $ServerName -Database $DatabaseName -TableName $TableName  –RefreshType Full -Credential $cred 
 $body = [ordered]@{
    output = @{Message = "Success"}
    statusCode = "200"
    }

$bodyJson = $body | ConvertTo-Json

if($callBackUri){
    Invoke-WebRequest -Uri $callBackUri -UseBasicParsing -Method Post -Body $bodyJson -ContentType "application/json"
}
 }
 
}
Catch {
 $ErrorCustom = $Error[0].Exception
 Write-Output "IncatchBlock"

$body = [ordered]@{
     output = @{Message = "$ErrorCustom"}
     error = @{
    ErrorCode = "CustomError"
    Message = "Custom Error Failure"
    }
    statusCode = "404"
    }


$bodyJson = $body | ConvertTo-Json
    
if($callBackUri){
    #Invoke-WebRequest -Uri $callBackUri -Method POST 
    Invoke-WebRequest -Uri $callBackUri -UseBasicParsing -Method Post -Body $bodyJson -ContentType "application/json"
}

}

Automation account is ready now ,now we need to ADF pipeline and trigger the automation account using ADF pipeline.

We need to create webhook activity on azure automation , please find the below screenshot and please save the URL as this will be used for ADF activity :-

please find the Json code for ADF pipeline :-

{
    "name": "PL_CCB_RunBook_AutomationCubeProcessing_copy1",
    "properties": {
        "activities": [
            {
                "name": "LookupDataForPartitionTables",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderStoredProcedureName": "[CCB].[usp_cube processing]",
                        "storedProcedureParameters": {
                            "model_name": {
                                "type": "String",
                                "value": {
                                    "value": "@pipeline().parameters.Model_Name",
                                    "type": "Expression"
                                }
                            }
                        },
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "ExractioonSourceDatasetQuery",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "ForEachForPartitionCreation",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "LookupDataForPartitionTables",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('LookupDataForPartitionTables').output.value",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "FailureOnTransformation",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "WebHookActivity",
                                    "dependencyConditions": [
                                        "Failed"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
                                "storedProcedureParameters": {
                                    "ACTIVITY_NAME": {
                                        "value": {
                                            "value": "@concat('WebHookActivity-', pipeline().parameters.Model_Name)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "START_DATE": {
                                        "value": {
                                            "value": "@variables('StartDate')",
                                            "type": "Expression"
                                        },
                                        "type": "Datetime"
                                    },
                                    "ERROR": {
                                        "value": {
                                            "value": "@activity('WebHookActivity').output.Message",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "FILEPATH": {
                                        "value": null,
                                        "type": "String"
                                    },
                                    "PIPELINE_NAME": {
                                        "value": {
                                            "value": "@pipeline().Pipeline",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ROWS_READ": {
                                        "value": null,
                                        "type": "Int32"
                                    },
                                    "ROWS_WRITTEN": {
                                        "value": null,
                                        "type": "Int32"
                                    },
                                    "STAGE": {
                                        "value": null,
                                        "type": "String"
                                    },
                                    "STATUS": {
                                        "value": "FAILED",
                                        "type": "String"
                                    },
                                    "TABLE_NAME": {
                                        "value": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "Connection_SQLDB_AzureIR",
                                "type": "LinkedServiceReference"
                            }
                        },
                        {
                            "name": "SuccessOnTransformation",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "UpdateLastPartition",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[STAGING].[usp_InsertLoggingRecords]",
                                "storedProcedureParameters": {
                                    "ACTIVITY_NAME": {
                                        "value": {
                                            "value": "@concat('WebHookActivity-', pipeline().parameters.Model_Name)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "START_DATE": {
                                        "value": {
                                            "value": "@variables('StartDate')",
                                            "type": "Expression"
                                        },
                                        "type": "Datetime"
                                    },
                                    "ERROR": {
                                        "value": null,
                                        "type": "String"
                                    },
                                    "FILEPATH": {
                                        "value": null,
                                        "type": "String"
                                    },
                                    "PIPELINE_NAME": {
                                        "value": {
                                            "value": "@pipeline().Pipeline",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "ROWS_READ": {
                                        "value": {
                                            "value": "1",
                                            "type": "Expression"
                                        },
                                        "type": "Int32"
                                    },
                                    "ROWS_WRITTEN": {
                                        "value": null,
                                        "type": "Int32"
                                    },
                                    "STAGE": {
                                        "value": "FullDataLoad",
                                        "type": "String"
                                    },
                                    "STATUS": {
                                        "value": "SUCCESS",
                                        "type": "String"
                                    },
                                    "TABLE_NAME": {
                                        "value": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "Connection_SQLDB_AzureIR",
                                "type": "LinkedServiceReference"
                            }
                        },
                        {
                            "name": "WebHookActivity",
                            "type": "WebHook",
                            "dependsOn": [
                                {
                                    "activity": "Set variableStartDate",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "url": {
                                    "value": "https://b1cd60af-b57d-4d46-aae3-1a687cf894b1.webhook.eus.azure-automation.net/webhooks?token=EeQ0qskZ44Vo9ZEWglcGMUs%2fTl%2bhzdR2i8Yg31lA7M0%3d",
                                    "type": "Expression"
                                },
                                "method": "POST",
                                "body": {
                                    "value": "@json(item().MessageBody)",
                                    "type": "Expression"
                                },
                                "timeout": "23:00:00",
                                "reportStatusOnCallBack": true
                            }
                        },
                        {
                            "name": "UpdateLastPartition",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "WebHookActivity",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[STAGING].[usp_updatecubepartition]",
                                "storedProcedureParameters": {
                                    "ModelName": {
                                        "value": {
                                            "value": "@item().Model_NAME",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "partitionName": {
                                        "value": {
                                            "value": "@item().PartitionName",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    },
                                    "TableName": {
                                        "value": {
                                            "value": "@item().Table_Name",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "Connection_SQLDB_AzureIR",
                                "type": "LinkedServiceReference"
                            }
                        },
                        {
                            "name": "Set variableStartDate",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "StartDate",
                                "value": {
                                    "value": "@utcnow()",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "Model_Name": {
                "type": "string",
                "defaultValue": "CCB_SALES"
            }
        },
        "variables": {
            "StartDate": {
                "type": "String"
            }
        },
        "annotations": [],
        "lastPublishTime": "2021-09-09T14:38:57Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

We have created the store procedure which being used for json code creation ,please find the store procedure below:-

CREATE PROCEDURE [CCB].[usp_cube processing](@model_name nvarchar(30))
AS
(


SELECT  MODEL_NAME,TABLE_NAME,TABLE_QUERY,[DateFormat],IsPartitionEnabled,
CASE WHEN MODEL_NAME='CCB_GL' THEN 'asazure://eastus.asazure.windows.net/preeudsoaanlys02' 
ELSE 'asazure://eastus.asazure.windows.net/preeudsoaanlys01' END AS ServerName,
CASE WHEN IsPartitionEnabled=0 THEN '' ELSE CONCAT(TABLE_NAME,'_',cast(FORMAT(DATEADD(MONTH, DATEDIFF(MONTH, 0, GETDATE())-1, 0),'MMyyyy') as nvarchar)) END AS PartitionName,
CONCAT('{"DatabaseName":"',MODEL_NAME,'",',
        '"TableName":"' ,TABLE_NAME  ,'",',
		'"IsPartitionEnabled":"' ,IsPartitionEnabled,'",',
		'"PartitionName":"',CASE WHEN IsPartitionEnabled=0 THEN '' ELSE CONCAT(TABLE_NAME,'_',cast(FORMAT(DATEADD(MONTH, DATEDIFF(MONTH, 0, GETDATE())-1, 0),'MMyyyy') as nvarchar)) END,'",',
		'"Query":"' , REPLACE(REPLACE(TABLE_QUERY,'@FROMDATE',FORMAT(DATEADD(MONTH, DATEDIFF(MONTH, 0, GETDATE())-1, 0),DateFormat)),'@TODATE',FORMAT(DATEADD(MONTH, DATEDIFF(MONTH, -1, GETDATE())-1, -1),DateFormat)) ,'",',
		'"ServerName":"',CASE WHEN MODEL_NAME='CCB_GL' THEN concat('asazure://eastus.asazure.windows.net/preeudsoaanlys02','"')
                         ELSE CONCAT('asazure://eastus.asazure.windows.net/preeudsoaanlys01' ,'"}') END) AS MessageBody

FROM  CCB.VW_CD_CUBE_ACTIVITY
WHERE  MODEL_NAME=@model_name 
)

We have create static table which have all the information for AS cube ,Please find the metadata for SQL table :-

CREATE TABLE [CCB].[CD_CUBE_ACTIVITY](
	[CUBE_KEY] [bigint] IDENTITY(100,1) NOT NULL,
	[MODEL_NAME] [varchar](100) NOT NULL,
	[TABLE_NAME] [varchar](500) NOT NULL,
	[TABLE_TYPE] [varchar](8) NOT NULL,
	[TABLE_QUERY] [varchar](3000) NULL,
	[IsPartitionEnabled] [bit] NOT NULL,
	[Partition_Name] [varchar](500) NOT NULL,
	[DateFormat] [varchar](500) NOT NULL,
	[LastPartitionNameCreated] [varchar](500) NOT NULL,
	[LinkedServerName] [varchar](500) NOT NULL,
	[LOAD_DATE] [datetime] NOT NULL,
	[UPDATED_DATE] [datetime] NOT NULL,
	[LOAD_BY] [nvarchar](80) NOT NULL,
	[UPDATED_BY] [nvarchar](80) NOT NULL,
PRIMARY KEY CLUSTERED 
(
	[CUBE_KEY] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

ALTER TABLE [CCB].[CD_CUBE_ACTIVITY] ADD  DEFAULT (getdate()) FOR [LOAD_DATE]
GO

ALTER TABLE [CCB].[CD_CUBE_ACTIVITY] ADD  DEFAULT (getdate()) FOR [UPDATED_DATE]
GO

ALTER TABLE [CCB].[CD_CUBE_ACTIVITY] ADD  DEFAULT (user_name()) FOR [LOAD_BY]
GO

ALTER TABLE [CCB].[CD_CUBE_ACTIVITY] ADD  DEFAULT (user_name()) FOR [UPDATED_BY]
GO

Please add all the cube detail mention above ,Please find the ADF configuration below :-

On above screenshot please add URL from WEBHOOK activity while creating runbook.

Leave a Reply

Your email address will not be published. Required fields are marked *

Back To Top