I have three pipelines in Data Fusion say A,B and C. I want to the Pipeline C to get triggered after execution of Pipeline A and B both Completes. Pipeline triggers are putting the dependency on one pipeline only. Can this be implemented in Data Fusion ?
Pipeline Dependencies in Data Fusion
2k views Asked by SUDHIR GARG AtThere are 3 answers
There is no direct way i could think of but two workarounds
Work around 1. Merging the pipeline A and B into pipeline AB then trigger pipeline C (AB > C).
Pipeline A - (GCS Copy > Decompress), Pipeline B - (GCS2 > thrashsad)
BigQueryExecute to mitigate error : Invalid DAG. There is an island made up of stages..
In BigQueryExecute, valid and dummy query.
Merging the two pipeline in one, may unease the pipeline testing. To overcome this you can add a dummy condition to run a pipeline one time.
- In BigQueryExecute,change query to 'Select ${flag}' and pass the value of flag in runtime argument or Select 1 as flag and tick "Row As Arguments" to true.
- Add condition plugin after BigQueryExecute and put condition runtime['flag'] = 1
- Condition plugin has two outlet, connect them to pipeline A and pipeline B.
Workaround 2 : Store the flag of both pipelines(A & B) in BiqQuery table,create two flow A>C and B >C to trigger the pipeline C. This would trigger pipeline C twice but using BigQueryExecute and condition plugin will run only when both flags are available in BigQuery table.
How?
- In Pipeline A & B to write output (a row) to BigQuery table 'Pipeline_Run'
- In Pipeline C, add BigQueryExecute and query 'select count(*) as Cnt from ds.Pipeline_Run' and tick "Row As Arguments" to true.
- In Pipeline C, add Condition plugin and check if value of cnt is 2 (runtime['cnt'] = 2) and connect your rest of the pipeline's plugins to its "Yes" outlet.
You can do it using Google Cloud Composer [1]. In order to perform this action first of all you need to create a new Environment in Google Cloud Composer [2], once done, you need to install a new Python Package in your environment [3], and the package that you will need to install is [4] "apache-airflow-backport-providers-google".
With this package installed you will be able to use these operations [5], the one you will need is [6] "Start a DataFusion pipeline", this way you will be able to start a new pipeline from Airflow.
An example of the python code would be as follows:
You can set the time intervals by checking the Airflow documentation.
Once you have this code saved as a .py file, save it to ther Google Cloud Storage DAG folder of your environment.
When the DAG starts, it will execute task A and when it finishes it will execute task B and so on.
[1] https://cloud.google.com/composer
[2] https://cloud.google.com/composer/docs/how-to/managing/creating#:~:text=In%20the%20Cloud%20Console%2C%20open%20the%20Create%20Environment%20page.&text=Under%20Node%20configuration%2C%20click%20Add%20environment%20variable.&text=The%20From%3A%20email%20address%2C%20such,%40%20.&text=Your%20SendGrid%20API%20key.
[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies
[4] https://pypi.org/project/apache-airflow-backport-providers-google/
[5] https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/cloud/operators/datafusion/index.html
[6] https://airflow.readthedocs.io/en/latest/howto/operator/google/cloud/datafusion.html#start-a-datafusion-pipeline