How to use ExitHandler with Kubeflow Pipelines SDK v2

2.6k views Asked by At

I'm trying to move all my Kubeflow Pipelines from using the previous SDK v1 (kfp), to the newer Pipelines SDK v2 (kfp.v2). I'm using version 1.8.12.This refactoring have proved successful for almost all code, except for the ExitHandler, which still exists; from kfp.v2.dsl import ExitHandler. It seems like the previous way of compiling the pipeline object into a tar.gz-file using kfp.compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz') file preserved some type of Argo placeholders, while the new .json pipelines using compiler.Compiler().compile(pipeline_func=pipeline, package_path="basic-pipeline.json") doesn't work the same way. Below, I will go into detail what works in Pipelines SDK v1 and how I've tried to implement it in v2.

Previously, using Kubeflow Pipelines v1, I could use an ExitHandler as shown in this StackOverflow question to eg. send a message to Slack when one of the pipeline components failed. I would define the pipeline as

import kfp.dsl as dsl

@dsl.pipeline(
    name='Basic-pipeline'
)
def pipeline(...):
    exit_task = dsl.ContainerOp(
        name='Exit handler that catches errors and post them in Slack',
        image='eu.gcr.io/.../send-error-msg-to-slack',
        arguments=[
                    'python3', 'main.py',
                    '--message', 'Basic-pipeline failed'
                    '--status', "{{workflow.status}}"
                  ]
    )
    with dsl.ExitHandler(exit_task):
        step_1 = dsl.ContainerOp(...)
        step_2 = dsl.ContainerOp(...) \
            .after(step_1)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')

where the exit_task would send the message to our Slack if any of the steps of the pipeline failed. The code for the exit_task image looks like

import argparse

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--message', type=str)
    parser.add_argument('--status', type=str)
    return parser.parse_known_args()

def main(FLAGS):
    def post_to_slack(msg):
        ...

    if FLAGS.status == "Failed":
        post_to_slack(FLAGS.message)
    else:
        pass

if __name__ == '__main__':
    FLAGS, unparsed = get_args()
    main(FLAGS)

This worked, because the underlying Argo workflow could somehow understand the "{{workflow.status}}" notion.

However, I'm now trying to use Vertex AI to run the pipeline, leveraging the Kubeflow Pipelines SDK v2, kfp.v2. Using the same exit-handler image as before, 'eu.gcr.io/.../send-error-msg-to-slack', I now define a yaml component file (exit_handler.yaml) instead,

name: Exit handler
description: Prints to Slack if any step of the pipeline fails

inputs:
  - {name: message, type: String}
  - {name: status, type: String}

implementation:
  container:
    image: eu.gcr.io/.../send-error-msg-to-slack
    command: [
      python3,
      main.py,
      --message, {inputValue: message},
      --status, {inputValue: status}
    ]

The pipeline code now looks like this instead,

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file

@pipeline(name="Basic-pipeline",
          pipeline_root='gs://.../basic-pipeline')
def pipeline():
    exit_handler_spec = load_component_from_file('./exit_handler.yaml')
    exit_handler = exit_handler_spec(
        message="Basic pipeline failed.",
        status="{{workflow.status}}"
    )
    with ExitHandler(exit_handler):
        step_0_spec = load_component_from_file('./comp_0.yaml')
        step0 = step_0_spec(...)

        step_1_spec = load_component_from_file('./comp_1.yaml')
        step1 = step_1_spec(...) \
            .after(step0)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path="basic-pipeline.json"
    )
    from google.oauth2 import service_account
    credentials = service_account.Credentials.from_service_account_file("./my-key.json")
    aiplatform.init(project='bsg-personalization',
                    location='europe-west4',
                    credentials=credentials)

    job = pipeline_jobs.PipelineJob(
        display_name="basic-pipeline",
        template_path="basic-pipeline.json",
        parameter_values={...}
    )
    job.run()

This "works" (no exceptions) to compile and run, but the ExitHandler code interprets the status as a string with value {{workflow.status}}, which is also indicated by the compiled pipeline json generated from the code above (basic-pipeline.json), which you can see below ("stringValue": "{{workflow.status}}"):

...
         "exit-handler": {
            "componentRef": {
              "name": "comp-exit-handler"
            },
            "dependentTasks": [
              "exit-handler-1"
            ],
            "inputs": {
              "parameters": {
                "message": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "Basic pipeline failed."
                    }
                  }
                },
                "status": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "{{workflow.status}}"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "exit-handler"
            },
            "triggerPolicy": {
              "strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
            }
          }
...

Any idea of how I can refactor my old ExitHandler code using v1 to the new SDK v2, to make the exit handler understand if the status of my pipeline is failed or not?

2

There are 2 answers

0
IronPan On

This is probably not yet fully documented but in V2 we introduced a different variable PipelineTaskFinalStatus that can automatically populated for you to send it to your Slack channel.

Here is an example of the exit handler in the official doc https://cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline

And here is the corresponding email notification component https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/v1/vertex_notification_email/component.yaml

You can write your own component with the following parameter which would be automatically populated when exit handler runs.

inputs:
...
  - name: pipeline_task_final_status
    type: PipelineTaskFinalStatus

(Note this feature is currently not available in Kubeflow Pipelines open source distribution yet and will be available in KFP V2. It's only available in Vertex Pipelines distribution)

0
chesu On

Replacement of "{{workflow.status}}" in KFP SDK v2 is the special type annotation PipelineTaskFinalStatus as IronPan mentioned above.

Its usage is documented in https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/pipelines/#dslexithandler