Azure Machine Learning SDK V2 - Sequencing pipeline steps with sweep job

114 views Asked by At

We are trying to migrate our azure ml pipelines from SDKV1 to SDKV2. The problem we are facing is that we cannot sequence the pipeline steps like we used to to in SDKV1.

In SDKV1, we could simply do this:

step_1 = command(
environment=azureml_env,
command="python step1.py",
code="./my_project_folder",
outputs=dummy_output
)
step_2 = command(
environment=azureml_env,
command="python step2.py",
code="./my_project_folder",
inputs=dummy_output,
outputs=dummy_output2
)
StepSequence(steps)

But with SDKv2, we have to use command component for each pipeline step. Here is how we are creating he command component:

image_creator_component = command(
    name="sweden-backbook-image-creator",
    display_name="Image Creator",
    description="Image creator step",
    inputs={
        "port_agg_col": Input(type='string'),
        "tgt_period_col": Input(type='string')
    },
    code=e.image_creator_sources_directory,
     command="python -m image_creator.image_creator",
     environment=pipeline_job_env,
 ).component(
       port_agg_col=port_agg_col,
       tgt_period_col=tgt_period_col,
  )

Note: We have to used .component method and provide values for the inputs other wise the pipeline complains that no argument is provided for input.

I am trying to integrate a clever solution provided here: Azure Machine Learning SDK V1 migration to V2 Pipeline Steps (The one recommended by Microsoft Azure Collective)

However, this solution works okay with just command step but we have a sweepjob step in our pipeline and therefore this is how I have tried to integrate this solution:

  1. Added is_sweep and sweep_properties to the Step class: enter image description here

  2. Because it complains if value is not provided for inputs, I added another method to Step class

    def configure_inputs(self, **kwargs):
        self.input_configuration = kwargs
    
  3. to configure sweep step, added another method:

def configure_sweep(self):
    command_step =  command(**self.__dict__).sweep(
    primary_metric=self.sweep_properties['primary_metric'],
    goal=self.sweep_properties['goal'],
    sampling_algorithm=self.sweep_properties['sampling_algorithm'],
    search_space=self.sweep_properties['search_space'],
    compute=self.sweep_properties['compute']
    )

    command_step.set_limits(
    max_total_trials=self.sweep_properties['max_total_trials'],
    max_concurrent_trials=self.sweep_properties['max_concurrent_trials'],
    timeout=self.sweep_properties['timeout']
    )
    return command_step
  1. The build method changed to following so it doesn't complain about input value:
command_dict[str(step)] = step.build().component(**step.input_configuration)(**inputs_dict)

How I create the components with sweep job is:

train_component = Step(
            name="sweden-backbook-train",
            display_name="Bayesian_HPO",
            description="Performs training and produces model",
            inputs={s
                "reg_targets": Input(type='string'),
                "clf_targets": Input(type='string'),
                "ml_datafile": Input(type='string'),
                "preprocessing_config_file": Input(type='string'),
                "port_agg_col": Input(type='string'),
                "tgt_period_col": Input(type='string'),
                "gpu_flag": Input(type='string'),
                "dataset_deploy_flag": Input(type='string'),
                ... lots of other inputs that should be provided by sweep job...
            },
            code=e.sources_directory,
            command="python -m train.main_portfolio",
            environment=pipeline_job_env,
            is_sweep=True,
            sweep_properties={
                'primary_metric':"mae_mean_cv",
                'goal':"minimize",
                'sampling_algorithm':'bayesian',
                'search_space':search_space,
                'compute':e.compute_name_train,
                'max_total_trials':int(e.max_total_trials),
                'max_concurrent_trials':int(e.max_concurrent_trials),
                'timeout':int(e.timeout)
            }
        )
        train_component.configure_inputs(
            reg_targets=reg_targets,
            clf_targets=clf_targets,
            ml_datafile=ml_datafile,
            preprocessing_config_file=preprocessing_config_file,
            port_agg_col=port_agg_col,
            tgt_period_col=tgt_period_col,
            gpu_flag=gpu_flag,
            dataset_deploy_flag=dataset_deploy_flag
        )
        
        #configure sweep
        sweep_step = train_component.configure_sweep()

When running this pipeline, I get an error in the create_pipeline function of the proposed solution:

Traceback (most recent call last): File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 194, in _run_module_as_main return _run_code(code, main_globals, None, File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/runpy.py", line 87, in _run_code exec(code, run_globals) File "/home/vsts/work/1/s/ml_service/components/cmp_train.py", line 401, in main(args) File "/home/vsts/work/1/s/ml_service/components/cmp_train.py", line 314, in main raise excp File "/home/vsts/work/1/s/ml_service/components/cmp_train.py", line 311, in main pipeline_job = create_pipeline(pipeline_steps, default_compute=e.compute_name_train if args.gpu_flag else e.compute_name_preprocess, name="my_pipeline", experiment_name=e.experiment_name_train) File "/home/vsts/work/1/s/ml_service/utils/pipeline_builder.py", line 206, in create_pipeline return default_pipeline() File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/azure/ai/ml/dsl/_pipeline_decorator.py", line 203, in wrapper pipeline_component = pipeline_builder.build( File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/azure/ai/ml/dsl/_pipeline_component_builder.py", line 187, in build outputs, _locals = get_outputs_and_locals(self.func, kwargs) File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/azure/ai/ml/_utils/_func_utils.py", line 471, in get_outputs_and_locals return _get_persistent_locals_builder().call(func, _all_kwargs) File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/azure/ai/ml/_utils/_func_utils.py", line 63, in call return self._call(func, _all_kwargs) File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/azure/ai/ml/_utils/_func_utils.py", line 109, in _call outputs = func(**_all_kwargs) File "/home/vsts/work/1/s/ml_service/utils/pipeline_builder.py", line 183, in default_pipeline step.update_link( TypeError: '_AttrDict' object is not callable

I believe it is because the command step is converted to sweep type and that is not callable. WOuld appreciate if there is any solution/clue to solve this.

The rest is the same as provided by the solution: Azure Machine Learning SDK V1 migration to V2 Pipeline Steps

Here is the full adopted code from @BeGreen

from collections import OrderedDict
from pathlib import Path
from typing import List
import random
import string

from azure.ai.ml import Input, Output, command
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import BuildContext, Environment


class StepsGraph:
    def __init__(self):
        """
        Initialize a StepsGraph instance to manage step dependencies.

        The StepsGraph uses an ordered dictionary to store steps and their dependencies.
        """
        self.steps = OrderedDict()

    def add_edges(self, step_1, step_2):
        """
        Add a dependency relationship between two steps.

        Args:
            step_1: The first step.
            step_2: The step that depends on the first step.
        """
        if step_1 not in self.steps:
            self.steps[step_1] = []
        if step_2 not in self.steps:
            self.steps[step_2] = []
        self.steps[step_1].append(step_2)

    def get_dependency(self):
        """
        Get the steps in the order of their dependencies.

        Returns:
            List: A list of steps in the order they need to be executed to satisfy all dependencies.
        """

        def dfs(node, visited, result):
            visited[node] = True
            if node in self.steps:
                for neighbor in self.steps[node]:
                    if not visited[neighbor]:
                        dfs(neighbor, visited, result)
            result.append(node)

        visited = {step: False for step in self.steps}
        result = []

        for step in self.steps:
            if not visited[step]:
                dfs(step, visited, result)

        return result[::-1]

    def get_parents(self, step):
        """
        Get the steps that are dependent on a given step.

        Args:
            step: The step to find dependent steps for.

        Returns:
            List: A list of steps that depend on the given step.
        """
        parents = []
        for s, connections in self.steps.items():
            if step in connections:
                parents.append(s)
        return parents

    def print_steps(self):
        for step, edges in self.steps.items():
            print(f"Step {step} -> {edges}")


def create_input(step):
    """
    Create an input dictionary for a step.

    Args:
        step (str): The name of the step for which to create an input.

    Returns:
        dict: A dictionary representing the input for the specified step with the following structure:
            {step: Input(type="uri_folder", mode="rw_mount")}
    """
    return {f"{step}": Input(type="uri_folder", mode="rw_mount")}


def create_output(step):
    """
    Create an output dictionary for a step.

    Args:
        step (str): The name of the step for which to create an output.

    Returns:
        dict: A dictionary representing the output for the specified step with the following structure:
            {step: Output(type="uri_folder", mode="rw_mount")}
    """
    return {f"{step}": Output(type="uri_folder", mode="rw_mount")}


def create_pipeline(steps_graph, default_compute, name, experiment_name):
    """
    Create a pipeline with specified steps and dependencies.

    Args:
        steps_graph (Step or StepsGraph): A Step or StepsGraph object representing the steps and their dependencies in the pipeline.
            If a Step is provided, it will be treated as a standalone step.
        default_compute: The default compute target for the pipeline (or None for serverless execution).
        name (str): The name of the pipeline.
        experiment_name (str): The name of the experiment associated with the pipeline.

    Returns:
        Callable: A callable function representing the created pipeline.

    Raises:
        ValueError: If `name` or `experiment_name` is not provided.

    Note:
        - The `steps_graph` argument can be a single Step or a StepsGraph object.
        - The pipeline's structure is determined by the dependencies defined in the `steps_graph`.
        - The pipeline is created as a Python function and can be executed when called.

    Example:
        # Create a pipeline with specific steps and dependencies

        steps_graph = StepsGraph()
        step1 = Step(...)
        step2 = Step(...)
        step3 = Step(...)

        steps_graph.add_edges(step_1, step_2)
        steps_graph.add_edges(step_2, step_3)
        steps_graph.add_edges(step_2, step_4)
        steps_graph.add_edges(step_2, step_6)
        steps_graph.add_edges(step_4, step_5)
        steps_graph.add_edges(step_3, step_7)
        steps_graph.add_edges(step_6, step_7)
        steps_graph.add_edges(step_5, step_7)

        pipeline_job = create_pipeline(steps_graph, default_compute="my_compute", name="my_pipeline", experiment_name="my_experiment")
    """
    # default_compute = None => Serverless
    if not name:
        raise ValueError("Please provide a `name` for your pipeline.")
    if not experiment_name:
        raise ValueError("Please provide an `experiment_name` for your pipeline.")

    @pipeline(
        default_compute=default_compute,
        name=experiment_name,
        experiment_name=experiment_name,
    )
    def default_pipeline():
        if isinstance(steps_graph, Step):
            steps_graph.build()()
            return
        dependency_oder = steps_graph.get_dependency()
        command_dict = {}
        parent_dict = {}

        for step, edges in steps_graph.steps.items():
            print(f"Step {step} -> {edges}")
            parent_dict[str(step)] = steps_graph.get_parents(step)

        print(f"parent_dict : {parent_dict}")
        print(f"dependency_oder: {dependency_oder}")

        print('=================================')
        for step in dependency_oder:
            print('=================================')
            print(f"step : {step}")
            print('----------------Type')
            print(type(step))
            inputs_dict = {}
            step.update_link(
                outputs=create_output(step),
            )
            for parent_node in reversed(parent_dict[str(step)]):
                step.update_link(
                    inputs=create_input(parent_node),
                )
                custom_output = getattr(
                    command_dict[str(parent_node)].outputs, str(parent_node)
                )
                input_name = list(parent_node.outputs.keys())[
                    0
                ]  # Because we know we have only one output per steps
                inputs_dict[input_name] = custom_output

            print(inputs_dict)

            for key, value in inputs_dict.items():
                print(key, value._port_name)

            print(step.inputs)
            command_dict[str(step)] = step.build().component(**step.input_configuration)(**inputs_dict)

    return default_pipeline()


def generate_custom_uuid(length=8, parts=4):
    custom_uuid = ""
    for _ in range(parts):
        part = "".join(random.choices(string.ascii_letters + string.digits, k=length))
        custom_uuid += part + "_"

    custom_uuid = custom_uuid[:-1]
    return custom_uuid


class Step:
    """
    Represents a step in a StepsGraph.

    This class is used to define and manage the properties of a step,
    including its inputs and outputs. It provides methods for updating
    the input and output links and for building the step's command.

    Attributes:
        inputs (dict): A dictionary of input values for the step.
        outputs (dict): A dictionary of output values for the step.

    Methods:
        __init__(self, **kwargs): Initializes a Step object with optional
            keyword arguments to set initial properties.
        __str__(self): Returns a string representation of the step.
        update_link(self, inputs=None, outputs=None): Updates the input and
            output links with the provided dictionaries.
        build(self): Builds and returns the command for executing the step.

    Example usage:
    >>> my_step = Step(name="Sample Step", inputs={"input_1": "value1"})
    >>> my_step.update_link(outputs={"output_1": "result"})
    >>> command = my_step.build()
    >>> # Then you need to call the command to build the inputs/outputs. Use `create_pipeline` for this.
    """

    def __init__(self, is_sweep=False, sweep_properties={}, **kwargs):
        self.inputs = None
        self.outputs = None
        self.__dict__.update(kwargs)
        self.uuid = self.display_name + "_" + generate_custom_uuid()
        self.is_sweep = is_sweep
        self.sweep_properties=sweep_properties

    def __str__(self):
        return self.uuid

    def update_link(self, inputs=None, outputs=None):
        if self.inputs and inputs:
            self.inputs.update(inputs)
        elif inputs:
            self.inputs = inputs
        if self.outputs and outputs:
            self.outputs.update(outputs)
        elif outputs:
            self.outputs = outputs

    def configure_sweep(self):
        command_step =  command(**self.__dict__).sweep(
            primary_metric=self.sweep_properties['primary_metric'],
            goal=self.sweep_properties['goal'],
            sampling_algorithm=self.sweep_properties['sampling_algorithm'],
            search_space=self.sweep_properties['search_space'],
            compute=self.sweep_properties['compute']
        )

        command_step.set_limits(
            max_total_trials=self.sweep_properties['max_total_trials'],
            max_concurrent_trials=self.sweep_properties['max_concurrent_trials'],
            timeout=self.sweep_properties['timeout']
        )
        return command_step

    def build(self):
        return command(**self.__dict__)

    def configure_inputs(self, **kwargs):
        self.input_configuration = kwargs
0

There are 0 answers