Setting parallelism of `ParallelFor` in Vertex AI dynamically, based on the pipeline parameter

47 views Asked by At

I have a Vertex AI pipeline that takes in a country argument (a string such as "uk", or "de"...). The first task of the pipeline get_country_dr_keys generates all "dr keys" (what that stands for is irrelevant here) that need to be processed for that country. The output of that then feeds into ParallelFor, i.e. the pipeline branches out once for each dr key.

What I'd like to do is to set the parallelism of that ParallelFor loop based on the country. I.e. do something like this:

    def jxp_country_pipeline(country_str: str):
        get_country_dr_keys_task = get_country_dr_keys(country_str=country_str)

        dr_keys_loop = ParallelFor(
            items=get_country_dr_keys_task.output,
            name="jxp-dr-loop",
            parallelism=work_out_parallelism(country_str=country_str),
        )
        with dr_keys_loop as dr_key:
            jxp_dr_pipeline(dr_key=dr_key)

The problem is that the pipeline does not compile, it seems that the parallelism is set at the compile time, i.e. the function work_out_parallelism is ran at compile time, when the country_str parameter isn't available yet.

Is there a way to determine and set the parallelism dynamically, e.g. based on the value of the pipeline parameter country_str, or at least the length of the output of dr_keys_loop?

0

There are 0 answers