StreamError sharing inputs form Sweep Job to Command Job - AzureML SDKv2

63 views Asked by At

We have AML pipelines (sdk v2) containing three steps i.e training, image_creation and model_selector. We are using command components for these three steps.

All of our steps can run in isolation as we save all data and configurations on storage accounts. But there is no support yet within sdkv2 to allow running of pipeline steps in certain sequence. Therefore we are forced to use dummy input outputs to link the steps so that they run in a certain order.

This is my pipeline function that creates the components and pipeline:

@pipeline()
def train_with_hyperparameters(reg_targets, clf_targets):
    ml_client_registry = get_client_registry()
    ml_client = get_client_workspace(..params...)
    pipeline_job_env = get_env_from_shared_registry(...params..)
    
    try:
        # Creating command component
        train_component = command(
            name="train",
            display_name="Bayesian HPO",
            description="Performs training and produces model",
            inputs={
                "reg_targets": Input(type='string'),
                "clf_targets": Input(type='string')
                ...other inputs here...
            },
            code=e.sources_directory,
            command="python -m train.main_portfolio",
            environment=pipeline_job_env,
            outputs=dict(
                auxilary_output_training = Output(type='uri_file')
            ),
        ).component(
            reg_targets=reg_targets,
            clf_targets=clf_targets,
            ml_datafile=ml_datafile,
            preprocessing_config_file=preprocessing_config_file,
            port_agg_col=port_agg_col,
            tgt_period_col=tgt_period_col,
            gpu_flag=gpu_flag,
            dataset_deploy_flag=dataset_deploy_flag
        ) # type: ignore
        
        search_space = {
            ...lot of parameters here...
        }
        sweep_step = train_component.sweep(
            primary_metric="mean_cv",
            ..settings here...
        )
        sweep_step.set_limits(...limits settings here...)

        #image creator component
        image_creator_component = command(
            name="image-creator",
            display_name="Image Creator",
            description="Image creator step",
            inputs={
                "port_agg_col": Input(type='string'),
                ..other inputs here...,
                "auxilary_link":Input(type='uri_file')
            },
            outputs=dict(
                auxilary_output_image_creator = Output(type='uri_file')
            ),
            code=e.image_creator_sources_directory,
            command="python -m image_creator.image_creator",
            environment=pipeline_job_env,
        ).component(
            port_agg_col=port_agg_col,
            tgt_period_col=tgt_period_col,
            auxilary_link=sweep_step.outputs.auxilary_output_training
        )#type:ignore

        #model selector component
        model_selector_component = command(
            name="model-selector",
            display_name="Model Selector",
            description="Model selector step",
            inputs={
                "model_name": Input(type='string'),
                ...other inputs here...,
                "auxilary_link":Input(type='uri_file')
            },
            code=e.best_model_sources_directory,
            command="python -m best_model_selector.register_best_model",
            environment=pipeline_job_env,
        ).component(
            model_name=model_name, 
            ..other values for inputs...,
            auxilary_link=image_creator_component.outputs.auxilary_output_image_creator
        )#type:ignore
        return
    except Exception as excp:
        logging.error('Unable to create command component.')
        raise excp

Here you can see three components:

enter image description here

How I am configuring the output from train source script, added argument:

parser.add_argument('--auxilary_output_training', type=str,
                        help="Auxiliar output for following steps in aml pipeline")

How I configure the auxilary_link(Input) in image_creator source script is:

parser.add_argument('--auxilary_link',
                        type=str,
                        help="""Just a dummy input to connect steps in AML pipeline""")

The problem:

The sweep steps uses a train script and inside that script I use:

joblib.dump(final_fit_zir, args.auxilary_output_training)

Just to dump the file which is model joblib. And passes this output as input to the next image_creator step.

In UI, I can see the sweep step produces the output and all the trial runs also produces this output: Parent run: enter image description here

Child runs(Trials): enter image description here

In both of above images, the output name is auxilary_output_training, which should be used as input by the image_creator step as: enter image description here

And inUI, the image_creator step inputs are also shown:

enter image description here

And the id of outputs in parens and child runs and id of input in image creator steps are same.

But, I receive StreamError: enter image description here

No code related logs are there because the job fails before it runs.

Any help/clue would be appreciated.

Thanks,

0

There are 0 answers