I'm trying to move all my Kubeflow Pipelines from using the previous SDK v1 (kfp
), to the newer Pipelines SDK v2 (kfp.v2
). I'm using version 1.8.12
.This refactoring have proved successful for almost all code, except for the ExitHandler
, which still exists; from kfp.v2.dsl import ExitHandler
. It seems like the previous way of compiling the pipeline object into a tar.gz
-file using kfp.compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')
file preserved some type of Argo placeholders, while the new .json
pipelines using compiler.Compiler().compile(pipeline_func=pipeline, package_path="basic-pipeline.json")
doesn't work the same way. Below, I will go into detail what works in Pipelines SDK v1 and how I've tried to implement it in v2.
Previously, using Kubeflow Pipelines v1, I could use an ExitHandler as shown in this StackOverflow question to eg. send a message to Slack when one of the pipeline components failed. I would define the pipeline as
import kfp.dsl as dsl
@dsl.pipeline(
name='Basic-pipeline'
)
def pipeline(...):
exit_task = dsl.ContainerOp(
name='Exit handler that catches errors and post them in Slack',
image='eu.gcr.io/.../send-error-msg-to-slack',
arguments=[
'python3', 'main.py',
'--message', 'Basic-pipeline failed'
'--status', "{{workflow.status}}"
]
)
with dsl.ExitHandler(exit_task):
step_1 = dsl.ContainerOp(...)
step_2 = dsl.ContainerOp(...) \
.after(step_1)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')
where the exit_task
would send the message
to our Slack if any of the steps of the pipeline failed. The code for the exit_task
image looks like
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--message', type=str)
parser.add_argument('--status', type=str)
return parser.parse_known_args()
def main(FLAGS):
def post_to_slack(msg):
...
if FLAGS.status == "Failed":
post_to_slack(FLAGS.message)
else:
pass
if __name__ == '__main__':
FLAGS, unparsed = get_args()
main(FLAGS)
This worked, because the underlying Argo workflow could somehow understand the "{{workflow.status}}"
notion.
However, I'm now trying to use Vertex AI to run the pipeline, leveraging the Kubeflow Pipelines SDK v2, kfp.v2
. Using the same exit-handler image as before, 'eu.gcr.io/.../send-error-msg-to-slack'
, I now define a yaml component file (exit_handler.yaml
) instead,
name: Exit handler
description: Prints to Slack if any step of the pipeline fails
inputs:
- {name: message, type: String}
- {name: status, type: String}
implementation:
container:
image: eu.gcr.io/.../send-error-msg-to-slack
command: [
python3,
main.py,
--message, {inputValue: message},
--status, {inputValue: status}
]
The pipeline code now looks like this instead,
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file
@pipeline(name="Basic-pipeline",
pipeline_root='gs://.../basic-pipeline')
def pipeline():
exit_handler_spec = load_component_from_file('./exit_handler.yaml')
exit_handler = exit_handler_spec(
message="Basic pipeline failed.",
status="{{workflow.status}}"
)
with ExitHandler(exit_handler):
step_0_spec = load_component_from_file('./comp_0.yaml')
step0 = step_0_spec(...)
step_1_spec = load_component_from_file('./comp_1.yaml')
step1 = step_1_spec(...) \
.after(step0)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="basic-pipeline.json"
)
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("./my-key.json")
aiplatform.init(project='bsg-personalization',
location='europe-west4',
credentials=credentials)
job = pipeline_jobs.PipelineJob(
display_name="basic-pipeline",
template_path="basic-pipeline.json",
parameter_values={...}
)
job.run()
This "works" (no exceptions) to compile and run, but the ExitHandler code interprets the status
as a string with value {{workflow.status}}, which is also indicated by the compiled pipeline json generated from the code above (basic-pipeline.json
), which you can see below ("stringValue": "{{workflow.status}}"
):
...
"exit-handler": {
"componentRef": {
"name": "comp-exit-handler"
},
"dependentTasks": [
"exit-handler-1"
],
"inputs": {
"parameters": {
"message": {
"runtimeValue": {
"constantValue": {
"stringValue": "Basic pipeline failed."
}
}
},
"status": {
"runtimeValue": {
"constantValue": {
"stringValue": "{{workflow.status}}"
}
}
}
}
},
"taskInfo": {
"name": "exit-handler"
},
"triggerPolicy": {
"strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
}
}
...
Any idea of how I can refactor my old ExitHandler
code using v1 to the new SDK v2, to make the exit handler understand if the status of my pipeline is failed or not?
This is probably not yet fully documented but in V2 we introduced a different variable
PipelineTaskFinalStatus
that can automatically populated for you to send it to your Slack channel.Here is an example of the exit handler in the official doc https://cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline
And here is the corresponding email notification component https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/v1/vertex_notification_email/component.yaml
You can write your own component with the following parameter which would be automatically populated when exit handler runs.
(Note this feature is currently not available in Kubeflow Pipelines open source distribution yet and will be available in KFP V2. It's only available in Vertex Pipelines distribution)