How to execute Google Data Fusion Pipeline from a event based triggers CDAP

1.3k views Asked by At

Is there any way to run a Google Data Fusion pipeline from CDAP event based triggers?

The 1st requirement is, whenever a new file arrives within a GCS bucket. it will trigger the Data Fusion pipeline to run automatically.

The 2nd requirement is pipeline dependency, For example, Pipeline B cannot run if Pipeline A not started or failed.

Thanks

1

There are 1 answers

0
Nick_Kh On

Reviewing your initial use case, I assume that for the 2nd requirement you might consider to look at CDAP pure components like: Schedules, Workflows and Triggers.

Generally, designing the run flow for underlying pipelines with some conditional execution schema, you create the Schedule object by defining the specific Workflow that holds the logical combination of the conditions between pipelines and apply the Trigger's model that matches you event occurrence.

According to the CDAP documentation:

Workflows can be controlled by the CDAP CLI and the Lifecycle HTTP RESTful API.

Having above mentioned, it is required to compose an appropriate HTTP request to CDAP REST API, containing JSON object that stores the details of the schedule to be created, based on the example from documentation and for the further reference I've created the workflow , whereas Pipeline_2 triggers only when Pipeline_1 succeeds:

{
  "name": "Schedule_1",
  "description": "Triggers Pipeline_2 on the succeding execution of Pipeline_1",
  "namespace": "<Pipeline_2-namespace>",
  "application": "Pipeline_2",
  "version": "<application version of the Pipeline_2>",
  "program": {
    "programName": "Workflow_name",
    "programType": "WORKFLOW"
  },
  "trigger": {
        "type": "PROGRAM_STATUS",
        "programId": {
            "namespace": "<Pipeline_1-namespace>",
            "application": "Pipeline_1",
            "version": "<application version of the Pipeline_1>",
            "type": "WORKFLOW",
            "entity": "PROGRAM",
            "program": "Workflow_name"
        },
        "programStatuses":  ["COMPLETED"]
  }
}

For the 1st requirement I'm not sure whether it can be feasible to achieve within the Data Fusion/CDAP native instruments, while I'm not able to see such kind of event, matching the continuous discover of GCS bucket:

Triggers are fired by events such as creation of a new partition in a dataset, or fulfillment of a cron expression of a time trigger, or the status of a program.

In such a case I would look at GCP Cloud function and GCP Composer, nicely written example, depicts the way how to use Cloud Functions for event-based DAG triggers, assuming that in particular Composer DAG file you can invoke sequential Data Fusion pipeline execution. Check out this Stack thread for more details.