Pipeline Dependencies in Data Fusion

2k views Asked by At

I have three pipelines in Data Fusion say A,B and C. I want to the Pipeline C to get triggered after execution of Pipeline A and B both Completes. Pipeline triggers are putting the dependency on one pipeline only. Can this be implemented in Data Fusion ?

3

There are 3 answers

2
Gonzalo Pérez Fernández On BEST ANSWER

You can do it using Google Cloud Composer [1]. In order to perform this action first of all you need to create a new Environment in Google Cloud Composer [2], once done, you need to install a new Python Package in your environment [3], and the package that you will need to install is [4] "apache-airflow-backport-providers-google".

With this package installed you will be able to use these operations [5], the one you will need is [6] "Start a DataFusion pipeline", this way you will be able to start a new pipeline from Airflow.

An example of the python code would be as follows:

import airflow
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow.providers.google.cloud.operators.datafusion import (
    CloudDataFusionStartPipelineOperator
)

default_args = {
   'start_date': airflow.utils.dates.days_ago(0),
   'retries': 1,
   'retry_delay': timedelta(minutes=5)
}

with models.DAG(
    'composer_DF',
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_args) as dag:

    # the operations.
    A = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="A", 
            instance_name="instance_name", task_id="start_pipelineA",
        )
    B = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="B", 
            instance_name="instance_name", task_id="start_pipelineB",
        )
    C = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="C", 
            instance_name="instance_name", task_id="start_pipelineC",
        )
    # First A then B and then C
    A >> B >> C

You can set the time intervals by checking the Airflow documentation.

Once you have this code saved as a .py file, save it to ther Google Cloud Storage DAG folder of your environment.

When the DAG starts, it will execute task A and when it finishes it will execute task B and so on.

[1] https://cloud.google.com/composer

[2] https://cloud.google.com/composer/docs/how-to/managing/creating#:~:text=In%20the%20Cloud%20Console%2C%20open%20the%20Create%20Environment%20page.&text=Under%20Node%20configuration%2C%20click%20Add%20environment%20variable.&text=The%20From%3A%20email%20address%2C%20such,%40%20.&text=Your%20SendGrid%20API%20key.

[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

[4] https://pypi.org/project/apache-airflow-backport-providers-google/

[5] https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/cloud/operators/datafusion/index.html

[6] https://airflow.readthedocs.io/en/latest/howto/operator/google/cloud/datafusion.html#start-a-datafusion-pipeline

1
adiideas On

You can explore "schedules" set through CDAP REST APIs. That allows parallel execution of pipelines and there is no dependency on cloud composer (except for file based trigger of first pipeline in workflow. For that you would need cloud function or may be cloud composer file sensor)

0
narendra solanki On

There is no direct way i could think of but two workarounds

Work around 1. Merging the pipeline A and B into pipeline AB then trigger pipeline C (AB > C).

Pipeline A - (GCS Copy > Decompress), Pipeline B - (GCS2 > thrashsad)

BigQueryExecute to mitigate error : Invalid DAG. There is an island made up of stages..

enter image description here

In BigQueryExecute, valid and dummy query.

enter image description here

Merging the two pipeline in one, may unease the pipeline testing. To overcome this you can add a dummy condition to run a pipeline one time.

  1. In BigQueryExecute,change query to 'Select ${flag}' and pass the value of flag in runtime argument or Select 1 as flag and tick "Row As Arguments" to true.
  2. Add condition plugin after BigQueryExecute and put condition runtime['flag'] = 1
  3. Condition plugin has two outlet, connect them to pipeline A and pipeline B.

enter image description here

Workaround 2 : Store the flag of both pipelines(A & B) in BiqQuery table,create two flow A>C and B >C to trigger the pipeline C. This would trigger pipeline C twice but using BigQueryExecute and condition plugin will run only when both flags are available in BigQuery table.

How?

  1. In Pipeline A & B to write output (a row) to BigQuery table 'Pipeline_Run'
  2. In Pipeline C, add BigQueryExecute and query 'select count(*) as Cnt from ds.Pipeline_Run' and tick "Row As Arguments" to true.
  3. In Pipeline C, add Condition plugin and check if value of cnt is 2 (runtime['cnt'] = 2) and connect your rest of the pipeline's plugins to its "Yes" outlet.