Copy CSV file data from blob storage to Azure SQL database based on the number of columns in the file

684 views Asked by At

I have data files landing in a single Azure blob storage container every other day or so. These files have either 8, 16, 24, or 32 columns of data. Each column has a unique name within a file, and the names are consistent across files. I.e the column names in the 8-column file will always be the first 8 column names of the 16, 24, and 32 column files. I have the appropriate 4 tables in an Azure SQL database set up to receive the files. I need to create a pipeline(s) in Azure Data Factory that will

  1. trigger upon the landing of a new file in the blob storage container
  2. check the # of columns in that file
  3. use the number of columns to copy the file from the blob into the appropriate Azure SQL database table. Meaning the 8 column blob file copies to the 8 column SQL table and so on.
  4. delete the file

I've researched the various pieces to complete this but cannot seem to put them together. Schema drift solution got me close but parameterization of the file names lost me. Multiple pipelines to achieve this is okay, as long as the single storage container is maintained. Thanks

2

There are 2 answers

2
Nandan On
  1. Use Blob trigger to trigger upon the landing of a new file in the blob storage container
  2. use get meta data activity to get the # of column in file details
  3. use a switch activity based on number of columns and based on that have copy activity and delete activity within the switch counterparts to copy the data and also delete the file
1
Rakesh Govindula On

I agree with @Nandan's approach. Also, you can try the below alternative using look up and filter if you want to avoid creating Switch cases.

For this approach, you should not have any other tables in your target database other than the above.

First create pipeline parameter and a Storage event trigger. Give trigger parameter@triggerBody().fileName to the pipeline parameter.

enter image description here

Now, use lookup activity query to get the list of table schema, table name and column count as an array of objects.

SELECT TABLE_SCHEMA
    , TABLE_NAME
    , number = COUNT(*) 
FROM INFORMATION_SCHEMA.COLUMNS 
where TABLE_NAME!='database_firewall_rules'
GROUP BY TABLE_SCHEMA, TABLE_NAME;

enter image description here

This will give the JSON array like this.

enter image description here

Next, use Get Meta activity by giving the triggered file name with dataset parameters and get the column count from it.

Now, use Filter activity to filter the correct SQL table which has same column count as our triggered file.

items: @activity('Lookup1').output.value Condition: @equals(activity('Get Metadata1').output.columnCount, item().number)

enter image description here

Filter output:

enter image description here

Now, use copy activity with dataset parameters.

Source with dataset parameters:

enter image description here

Sink:

enter image description here

enter image description here

Then use delete activity for the triggered file.

My pipeline JSON:

{
"name": "pipeline5_copy1",
"properties": {
    "activities": [
        {
            "name": "Lookup1",
            "type": "Lookup",
            "dependsOn": [],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "AzureSqlSource",
                    "sqlReaderQuery": "SELECT TABLE_SCHEMA\n    , TABLE_NAME\n    , number = COUNT(*) \nFROM INFORMATION_SCHEMA.COLUMNS \nwhere TABLE_NAME!='database_firewall_rules'\nGROUP BY TABLE_SCHEMA, TABLE_NAME;",
                    "queryTimeout": "02:00:00",
                    "partitionOption": "None"
                },
                "dataset": {
                    "referenceName": "Dataset_for_column_count",
                    "type": "DatasetReference"
                },
                "firstRowOnly": false
            }
        },
        {
            "name": "Filter1",
            "type": "Filter",
            "dependsOn": [
                {
                    "activity": "Get Metadata1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "userProperties": [],
            "typeProperties": {
                "items": {
                    "value": "@activity('Lookup1').output.value",
                    "type": "Expression"
                },
                "condition": {
                    "value": "@equals(activity('Get Metadata1').output.columnCount, item().number)",
                    "type": "Expression"
                }
            }
        },
        {
            "name": "Get Metadata1",
            "type": "GetMetadata",
            "dependsOn": [
                {
                    "activity": "Lookup1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "dataset": {
                    "referenceName": "Sourcefile",
                    "type": "DatasetReference",
                    "parameters": {
                        "sourcefilename": {
                            "value": "@pipeline().parameters.tfilename",
                            "type": "Expression"
                        }
                    }
                },
                "fieldList": [
                    "columnCount"
                ],
                "storeSettings": {
                    "type": "AzureBlobFSReadSettings",
                    "enablePartitionDiscovery": false
                },
                "formatSettings": {
                    "type": "DelimitedTextReadSettings"
                }
            }
        },
        {
            "name": "Copy data1",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "Filter1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "DelimitedTextSource",
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "recursive": true,
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "DelimitedTextReadSettings"
                    }
                },
                "sink": {
                    "type": "AzureSqlSink",
                    "writeBehavior": "insert",
                    "sqlWriterUseTableLock": false
                },
                "enableStaging": false,
                "translator": {
                    "type": "TabularTranslator",
                    "typeConversion": true,
                    "typeConversionSettings": {
                        "allowDataTruncation": true,
                        "treatBooleanAsNumber": false
                    }
                }
            },
            "inputs": [
                {
                    "referenceName": "Sourcefile",
                    "type": "DatasetReference",
                    "parameters": {
                        "sourcefilename": {
                            "value": "@pipeline().parameters.tfilename",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "outputs": [
                {
                    "referenceName": "AzureSqlTable1",
                    "type": "DatasetReference",
                    "parameters": {
                        "schema": {
                            "value": "@activity('Filter1').output.Value[0].TABLE_SCHEMA",
                            "type": "Expression"
                        },
                        "table_name": {
                            "value": "@activity('Filter1').output.Value[0].TABLE_NAME",
                            "type": "Expression"
                        }
                    }
                }
            ]
        },
        {
            "name": "Delete1",
            "type": "Delete",
            "dependsOn": [
                {
                    "activity": "Copy data1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "dataset": {
                    "referenceName": "Sourcefile",
                    "type": "DatasetReference",
                    "parameters": {
                        "sourcefilename": {
                            "value": "@pipeline().parameters.tfilename",
                            "type": "Expression"
                        }
                    }
                },
                "enableLogging": false,
                "storeSettings": {
                    "type": "AzureBlobFSReadSettings",
                    "recursive": true,
                    "enablePartitionDiscovery": false
                }
            }
        }
    ],
    "parameters": {
        "tfilename": {
            "type": "string"
        }
    },
    "variables": {
        "sample": {
            "type": "String"
        }
    },
    "annotations": []
}
}

Result:

enter image description here