The DAG failed because some tasks failed. The failed tasks are [concat]

138 views Asked by At

I am getting a AiPlatformException:code=RESOURCE EXHAUSTED: message = The following quota metrics exceeds the existing quota. Whereas my pipeline is a basic pipeline of 2 components so it is not a heavy task. The task state states DRIVER SUCCEEDED.

import kfp
from kfp.dsl import pipeline
from kfp.dsl import component
from kfp import compiler

from collections import namedtuple
from typing import NamedTuple
from google.cloud import aiplatform
import logging

PROJECT_ID = "practice-training"

aiplatform.init(project = PROJECT_ID,location ='asia-south1')

I am importing all the neccessary modules required for running a pipeline.

# Create components
@component(base_image='python:3.10.12')
def concat(a: str, b:str)->str:
    return a+b

@component()
#def reverse(a: str) -> dict:
 #   return {"before": a, "after": a[::-1]}
def reverse(a: str)->NamedTuple("outputs",[("before",str),("after",str)]):
    return a,a[::-1]

# Create Pipeline
@pipeline(
    name="basic-pipeline",                                                       
    pipeline_root = 'gs://softys-pipeline-artifacts',
    description = "My First Pipeline"
    )
def basic_pipeline(x:str = "stres", y:str = "sed"):                               # 2 pipeline parameters
    concat_task = concat(a=x,b=y)                                        # parameters of pipeline are input of first component
    reverse_task = reverse(a = concat_task.output)                        # output of first component is input of second component

Here I created 2 basic concatenate and reversing the string components. Then I created a basic pipeline with 2 parameters where parameters of pipeline are input for first component and input of second component is out of the first parameter.

Then I compiled the pipeline and run it using the following code.

compiler.Compiler().compile(
    pipeline_func=basic_pipeline, package_path="basic_pipeline.yaml")

I have tried the package path both ways as yaml and json.

from google.cloud.aiplatform import pipeline_jobs

job = pipeline_jobs.PipelineJob(
    display_name = "basic-pipeline",
    template_path = "basic_pipeline.yaml",
    parameter_values={"x": "stres","y" :"sed"},
    enable_caching = True
)
job.submit()

We can also use job.run(sync=False) instead of job.submit()

What can i do to resolve this issue?

0

There are 0 answers